rsigma_runtime/enrichment/
http.rs1use std::sync::Arc;
15use std::time::Duration;
16
17use async_trait::async_trait;
18use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
19use rsigma_eval::EvaluationResult;
20use rsigma_eval::pipeline::sources::ExtractExpr;
21use serde_json::Value;
22
23use super::{
24 EnrichError, EnrichErrorKind, Enricher, EnricherKind, OnError, Scope,
25 http_cache::{CacheKey, CacheOutcome, HttpResponseCache},
26 inject_enrichment,
27 template::render_template,
28};
29use crate::metrics::{MetricsHook, NoopMetrics};
30
31pub struct HttpEnricher {
37 id: String,
38 kind: EnricherKind,
39 inject_field: String,
40 method: String,
41 url: String,
42 headers: Vec<(String, String)>,
43 body: Option<String>,
44 timeout: Duration,
45 on_error: OnError,
46 scope: Scope,
47 extract: Option<ExtractExpr>,
48 client: HttpEnricherClient,
49 cache: HttpResponseCache,
50 metrics: Arc<dyn MetricsHook>,
51}
52
53#[derive(Clone)]
61pub struct HttpEnricherClient(Arc<reqwest::Client>);
62
63impl HttpEnricherClient {
64 pub fn from_reqwest(client: Arc<reqwest::Client>) -> Self {
67 Self(client)
68 }
69 fn inner(&self) -> &reqwest::Client {
72 &self.0
73 }
74}
75
76pub fn build_default_http_client() -> Result<HttpEnricherClient, String> {
80 reqwest::Client::builder()
81 .build()
82 .map(|c| HttpEnricherClient(Arc::new(c)))
83 .map_err(|e| format!("reqwest client build failed: {e}"))
84}
85
86impl HttpEnricher {
87 #[allow(clippy::too_many_arguments)]
94 pub fn new(
95 id: String,
96 kind: EnricherKind,
97 inject_field: String,
98 method: String,
99 url: String,
100 headers: Vec<(String, String)>,
101 body: Option<String>,
102 timeout: Duration,
103 on_error: OnError,
104 scope: Scope,
105 extract: Option<ExtractExpr>,
106 client: HttpEnricherClient,
107 cache: HttpResponseCache,
108 ) -> Self {
109 Self {
110 id,
111 kind,
112 inject_field,
113 method: method.to_ascii_uppercase(),
114 url,
115 headers,
116 body,
117 timeout,
118 on_error,
119 scope,
120 extract,
121 client,
122 cache,
123 metrics: Arc::new(NoopMetrics),
124 }
125 }
126
127 pub fn with_metrics(mut self, metrics: Arc<dyn MetricsHook>) -> Self {
134 metrics.register_http_enricher_cache(&self.id);
135 self.metrics = metrics;
136 self
137 }
138
139 pub fn cache(&self) -> &HttpResponseCache {
142 &self.cache
143 }
144}
145
146#[async_trait]
147impl Enricher for HttpEnricher {
148 fn kind(&self) -> EnricherKind {
149 self.kind
150 }
151 fn id(&self) -> &str {
152 &self.id
153 }
154 fn inject_field(&self) -> &str {
155 &self.inject_field
156 }
157 fn timeout(&self) -> Duration {
158 self.timeout
159 }
160 fn scope(&self) -> &Scope {
161 &self.scope
162 }
163 fn on_error(&self) -> OnError {
164 self.on_error
165 }
166
167 async fn enrich(&self, result: &mut EvaluationResult) -> Result<(), EnrichError> {
168 let url = render_template(&self.url, result);
169 let body = self.body.as_ref().map(|b| render_template(b, result));
170
171 let cache_key = CacheKey::new(&self.method, &url, body.as_deref().map(str::as_bytes));
172 let (outcome, cached) = self.cache.lookup(&cache_key);
173 match outcome {
174 CacheOutcome::Hit => self.metrics.on_enrichment_http_cache_hit(&self.id),
175 CacheOutcome::Miss => self.metrics.on_enrichment_http_cache_miss(&self.id),
176 CacheOutcome::Expired => {
177 self.metrics.on_enrichment_http_cache_expiration(&self.id);
178 self.metrics.on_enrichment_http_cache_miss(&self.id);
179 }
180 }
181 if let Some(cached_value) = cached {
182 let extracted = self.maybe_extract(&cached_value)?;
183 inject_enrichment(result, &self.inject_field, extracted);
184 return Ok(());
185 }
186
187 let mut header_map = HeaderMap::with_capacity(self.headers.len());
188 for (name, value_template) in &self.headers {
189 let rendered = render_template(value_template, result);
190 let header_name = HeaderName::from_bytes(name.as_bytes()).map_err(|e| EnrichError {
191 enricher_id: self.id.clone(),
192 kind: EnrichErrorKind::Fetch(format!("invalid header name '{name}': {e}")),
193 })?;
194 let header_value = HeaderValue::from_str(&rendered).map_err(|e| EnrichError {
195 enricher_id: self.id.clone(),
196 kind: EnrichErrorKind::Fetch(format!("invalid header value for '{name}': {e}")),
197 })?;
198 header_map.insert(header_name, header_value);
199 }
200
201 let method =
202 reqwest::Method::from_bytes(self.method.as_bytes()).map_err(|e| EnrichError {
203 enricher_id: self.id.clone(),
204 kind: EnrichErrorKind::Fetch(format!("invalid method '{}': {e}", self.method)),
205 })?;
206
207 let mut req = self
208 .client
209 .inner()
210 .request(method, &url)
211 .headers(header_map);
212 if let Some(b) = &body {
213 req = req.body(b.clone());
214 }
215 let resp = req.send().await.map_err(|e| EnrichError {
216 enricher_id: self.id.clone(),
217 kind: if e.is_timeout() {
218 EnrichErrorKind::Timeout
219 } else {
220 EnrichErrorKind::Fetch(format!("{e}"))
221 },
222 })?;
223
224 let status = resp.status();
225 if !status.is_success() {
226 return Err(EnrichError {
227 enricher_id: self.id.clone(),
228 kind: EnrichErrorKind::Fetch(format!("HTTP {status}")),
229 });
230 }
231
232 let bytes = resp.bytes().await.map_err(|e| EnrichError {
233 enricher_id: self.id.clone(),
234 kind: EnrichErrorKind::Fetch(format!("body read: {e}")),
235 })?;
236 let parsed: Value = serde_json::from_slice(&bytes).map_err(|e| EnrichError {
237 enricher_id: self.id.clone(),
238 kind: EnrichErrorKind::Parse(format!("JSON: {e}")),
239 })?;
240
241 self.cache.insert(cache_key, parsed.clone());
245
246 let extracted = self.maybe_extract(&parsed)?;
247 inject_enrichment(result, &self.inject_field, extracted);
248 Ok(())
249 }
250}
251
252impl HttpEnricher {
253 fn maybe_extract(&self, value: &Value) -> Result<Value, EnrichError> {
256 match &self.extract {
257 None => Ok(value.clone()),
258 Some(expr) => {
259 crate::sources::extract::apply_extract(value, expr).map_err(|e| EnrichError {
260 enricher_id: self.id.clone(),
261 kind: EnrichErrorKind::Extract(format!("{}", e.kind)),
262 })
263 }
264 }
265 }
266}