rsigma_runtime/enrichment/
http.rs1use std::sync::Arc;
15use std::time::Duration;
16
17use async_trait::async_trait;
18use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
19use rsigma_eval::EvaluationResult;
20use rsigma_eval::pipeline::sources::ExtractExpr;
21use serde_json::Value;
22
23use super::{
24 EnrichError, EnrichErrorKind, Enricher, EnricherKind, OnError, Scope,
25 http_cache::{CacheKey, CacheOutcome, HttpResponseCache},
26 inject_enrichment,
27 template::render_template,
28};
29use crate::metrics::{MetricsHook, NoopMetrics};
30use crate::sources::MAX_SOURCE_RESPONSE_BYTES;
31
32pub const DEFAULT_ENRICHER_MAX_RESPONSE_BYTES: usize = MAX_SOURCE_RESPONSE_BYTES;
37
38pub struct HttpEnricher {
44 id: String,
45 kind: EnricherKind,
46 inject_field: String,
47 method: String,
48 url: String,
49 headers: Vec<(String, String)>,
50 body: Option<String>,
51 timeout: Duration,
52 on_error: OnError,
53 scope: Scope,
54 extract: Option<ExtractExpr>,
55 client: HttpEnricherClient,
56 cache: HttpResponseCache,
57 metrics: Arc<dyn MetricsHook>,
58 max_response_bytes: usize,
64}
65
66#[derive(Clone)]
74pub struct HttpEnricherClient(Arc<reqwest::Client>);
75
76impl HttpEnricherClient {
77 pub fn from_reqwest(client: Arc<reqwest::Client>) -> Self {
80 Self(client)
81 }
82 fn inner(&self) -> &reqwest::Client {
85 &self.0
86 }
87}
88
89pub fn build_default_http_client() -> Result<HttpEnricherClient, String> {
99 let resolver =
100 crate::egress::EgressFilteredResolver::new(crate::egress::default_egress_policy())
101 .into_dns_resolver();
102 reqwest::Client::builder()
103 .dns_resolver(resolver)
104 .build()
105 .map(|c| HttpEnricherClient(Arc::new(c)))
106 .map_err(|e| format!("reqwest client build failed: {e}"))
107}
108
109impl HttpEnricher {
110 #[allow(clippy::too_many_arguments)]
117 pub fn new(
118 id: String,
119 kind: EnricherKind,
120 inject_field: String,
121 method: String,
122 url: String,
123 headers: Vec<(String, String)>,
124 body: Option<String>,
125 timeout: Duration,
126 on_error: OnError,
127 scope: Scope,
128 extract: Option<ExtractExpr>,
129 client: HttpEnricherClient,
130 cache: HttpResponseCache,
131 ) -> Self {
132 Self {
133 id,
134 kind,
135 inject_field,
136 method: method.to_ascii_uppercase(),
137 url,
138 headers,
139 body,
140 timeout,
141 on_error,
142 scope,
143 extract,
144 client,
145 cache,
146 metrics: Arc::new(NoopMetrics),
147 max_response_bytes: DEFAULT_ENRICHER_MAX_RESPONSE_BYTES,
148 }
149 }
150
151 pub fn with_max_response_bytes(mut self, max_bytes: usize) -> Self {
158 self.max_response_bytes = max_bytes;
159 self
160 }
161
162 pub fn with_metrics(mut self, metrics: Arc<dyn MetricsHook>) -> Self {
169 metrics.register_http_enricher_cache(&self.id);
170 self.metrics = metrics;
171 self
172 }
173
174 pub fn cache(&self) -> &HttpResponseCache {
177 &self.cache
178 }
179}
180
181#[async_trait]
182impl Enricher for HttpEnricher {
183 fn kind(&self) -> EnricherKind {
184 self.kind
185 }
186 fn id(&self) -> &str {
187 &self.id
188 }
189 fn inject_field(&self) -> &str {
190 &self.inject_field
191 }
192 fn timeout(&self) -> Duration {
193 self.timeout
194 }
195 fn scope(&self) -> &Scope {
196 &self.scope
197 }
198 fn on_error(&self) -> OnError {
199 self.on_error
200 }
201
202 async fn enrich(&self, result: &mut EvaluationResult) -> Result<(), EnrichError> {
203 let url = render_template(&self.url, result);
204 let body = self.body.as_ref().map(|b| render_template(b, result));
205
206 let cache_key = CacheKey::new(&self.method, &url, body.as_deref().map(str::as_bytes));
207 let (outcome, cached) = self.cache.lookup(&cache_key);
208 match outcome {
209 CacheOutcome::Hit => self.metrics.on_enrichment_http_cache_hit(&self.id),
210 CacheOutcome::Miss => self.metrics.on_enrichment_http_cache_miss(&self.id),
211 CacheOutcome::Expired => {
212 self.metrics.on_enrichment_http_cache_expiration(&self.id);
213 self.metrics.on_enrichment_http_cache_miss(&self.id);
214 }
215 }
216 if let Some(cached_value) = cached {
217 let extracted = self.maybe_extract(&cached_value)?;
218 inject_enrichment(result, &self.inject_field, extracted);
219 return Ok(());
220 }
221
222 let mut header_map = HeaderMap::with_capacity(self.headers.len());
223 for (name, value_template) in &self.headers {
224 let rendered = render_template(value_template, result);
225 let header_name = HeaderName::from_bytes(name.as_bytes()).map_err(|e| EnrichError {
226 enricher_id: self.id.clone(),
227 kind: EnrichErrorKind::Fetch(format!("invalid header name '{name}': {e}")),
228 })?;
229 let header_value = HeaderValue::from_str(&rendered).map_err(|e| EnrichError {
230 enricher_id: self.id.clone(),
231 kind: EnrichErrorKind::Fetch(format!("invalid header value for '{name}': {e}")),
232 })?;
233 header_map.insert(header_name, header_value);
234 }
235
236 let method =
237 reqwest::Method::from_bytes(self.method.as_bytes()).map_err(|e| EnrichError {
238 enricher_id: self.id.clone(),
239 kind: EnrichErrorKind::Fetch(format!("invalid method '{}': {e}", self.method)),
240 })?;
241
242 let mut req = self
243 .client
244 .inner()
245 .request(method, &url)
246 .headers(header_map);
247 if let Some(b) = &body {
248 req = req.body(b.clone());
249 }
250 let resp = req.send().await.map_err(|e| EnrichError {
251 enricher_id: self.id.clone(),
252 kind: if e.is_timeout() {
253 EnrichErrorKind::Timeout
254 } else {
255 EnrichErrorKind::Fetch(format!("{e}"))
256 },
257 })?;
258
259 let status = resp.status();
260 if !status.is_success() {
261 return Err(EnrichError {
262 enricher_id: self.id.clone(),
263 kind: EnrichErrorKind::Fetch(format!("HTTP {status}")),
264 });
265 }
266
267 if let Some(content_length) = resp.content_length()
271 && content_length as usize > self.max_response_bytes
272 {
273 return Err(EnrichError {
274 enricher_id: self.id.clone(),
275 kind: EnrichErrorKind::Fetch(format!(
276 "HTTP response Content-Length ({content_length} bytes) exceeds {} byte limit",
277 self.max_response_bytes,
278 )),
279 });
280 }
281 let bytes = read_body_capped(resp, self.max_response_bytes, &self.id).await?;
282 let parsed: Value = serde_json::from_slice(&bytes).map_err(|e| EnrichError {
283 enricher_id: self.id.clone(),
284 kind: EnrichErrorKind::Parse(format!("JSON: {e}")),
285 })?;
286
287 self.cache.insert(cache_key, parsed.clone());
291
292 let extracted = self.maybe_extract(&parsed)?;
293 inject_enrichment(result, &self.inject_field, extracted);
294 Ok(())
295 }
296}
297
298async fn read_body_capped(
302 mut response: reqwest::Response,
303 max_bytes: usize,
304 enricher_id: &str,
305) -> Result<Vec<u8>, EnrichError> {
306 let mut buf: Vec<u8> = Vec::new();
307 while let Some(chunk) = response.chunk().await.map_err(|e| EnrichError {
308 enricher_id: enricher_id.to_string(),
309 kind: EnrichErrorKind::Fetch(format!("body read: {e}")),
310 })? {
311 if buf.len() + chunk.len() > max_bytes {
312 return Err(EnrichError {
313 enricher_id: enricher_id.to_string(),
314 kind: EnrichErrorKind::Fetch(format!(
315 "HTTP response body exceeds {max_bytes} byte limit"
316 )),
317 });
318 }
319 buf.extend_from_slice(&chunk);
320 }
321 Ok(buf)
322}
323
324impl HttpEnricher {
325 fn maybe_extract(&self, value: &Value) -> Result<Value, EnrichError> {
328 match &self.extract {
329 None => Ok(value.clone()),
330 Some(expr) => {
331 crate::sources::extract::apply_extract(value, expr).map_err(|e| EnrichError {
332 enricher_id: self.id.clone(),
333 kind: EnrichErrorKind::Extract(format!("{}", e.kind)),
334 })
335 }
336 }
337 }
338}