Skip to main content

rsigma_runtime/enrichment/
http.rs

1//! `HttpEnricher`: per-result HTTP fetch with optional response cache.
2//!
3//! Builds a [`reqwest::Request`] from a template-expanded URL, optional
4//! template-expanded headers, and an optional template-expanded body.
5//! The response is parsed as JSON; if an `extract` expression is set,
6//! it is applied via [`crate::sources::extract::apply_extract`] using
7//! the existing dynamic-pipelines extractor stack (jq / jsonpath / cel)
8//! so operators learn one mental model, not two.
9//!
10//! Optional in-memory response cache via [`super::http_cache::HttpResponseCache`]
11//! keyed on `(method, url, body_hash)` with configurable TTL. Mandatory
12//! in practice for rate-limited APIs (VirusTotal: 4 req/min free tier).
13
14use std::sync::Arc;
15use std::time::Duration;
16
17use async_trait::async_trait;
18use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
19use rsigma_eval::EvaluationResult;
20use rsigma_eval::pipeline::sources::ExtractExpr;
21use serde_json::Value;
22
23use super::{
24    EnrichError, EnrichErrorKind, Enricher, EnricherKind, OnError, Scope,
25    http_cache::{CacheKey, CacheOutcome, HttpResponseCache},
26    inject_enrichment,
27    template::render_template,
28};
29use crate::metrics::{MetricsHook, NoopMetrics};
30use crate::sources::MAX_SOURCE_RESPONSE_BYTES;
31
32/// Default cap on the HTTP enricher response body, mirroring
33/// [`crate::sources::MAX_SOURCE_RESPONSE_BYTES`]. An untrusted upstream
34/// that streams an unbounded body cannot OOM the daemon: the chunked
35/// reader bails out once the cap is reached.
36pub const DEFAULT_ENRICHER_MAX_RESPONSE_BYTES: usize = MAX_SOURCE_RESPONSE_BYTES;
37
38/// One HTTP enricher instance.
39///
40/// Constructed by the daemon config loader. The `Arc<reqwest::Client>`
41/// is shared across all HTTP enrichers in the same daemon process so
42/// connection pooling works at the process level rather than per-config-block.
43pub struct HttpEnricher {
44    id: String,
45    kind: EnricherKind,
46    inject_field: String,
47    method: String,
48    url: String,
49    headers: Vec<(String, String)>,
50    body: Option<String>,
51    timeout: Duration,
52    on_error: OnError,
53    scope: Scope,
54    extract: Option<ExtractExpr>,
55    client: HttpEnricherClient,
56    cache: HttpResponseCache,
57    metrics: Arc<dyn MetricsHook>,
58    /// Maximum number of bytes to read from the upstream response.
59    /// Defaults to [`DEFAULT_ENRICHER_MAX_RESPONSE_BYTES`]. Configurable
60    /// via [`HttpEnricher::with_max_response_bytes`] when an enricher
61    /// genuinely needs a larger payload (and the operator has accepted
62    /// the memory cost).
63    max_response_bytes: usize,
64}
65
66/// Opaque handle around a process-wide `reqwest::Client`. Constructed by
67/// [`build_default_http_client`] and passed by [`Arc`] to every
68/// [`HttpEnricher`] so connection pooling works at the daemon level.
69///
70/// Wrapping the `reqwest::Client` keeps `reqwest` an internal dependency
71/// of `rsigma-runtime` so consumer crates (e.g. `rsigma-cli`) do not
72/// need a direct dep just to wire enrichers together.
73#[derive(Clone)]
74pub struct HttpEnricherClient(Arc<reqwest::Client>);
75
76impl HttpEnricherClient {
77    /// Wrap an existing `reqwest::Client`. Useful for tests that want to
78    /// stub out the client (e.g. a `wiremock`-backed tower stack).
79    pub fn from_reqwest(client: Arc<reqwest::Client>) -> Self {
80        Self(client)
81    }
82    /// Access the inner client. Crate-private so external code goes
83    /// through the wrapper.
84    fn inner(&self) -> &reqwest::Client {
85        &self.0
86    }
87}
88
89/// Build the default shared HTTP client used by the daemon's enrichment
90/// pipeline. All HTTP enrichers in the same daemon process share one
91/// client to amortize TLS handshakes and DNS resolution.
92///
93/// The client is wired to the
94/// [`default_egress_policy`](crate::egress::default_egress_policy) via a
95/// custom DNS resolver so an enrichment URL that resolves to a denied
96/// address (cloud metadata, link-local) fails fast at connect time
97/// rather than completing a request to a sensitive endpoint.
98pub fn build_default_http_client() -> Result<HttpEnricherClient, String> {
99    let resolver =
100        crate::egress::EgressFilteredResolver::new(crate::egress::default_egress_policy())
101            .into_dns_resolver();
102    reqwest::Client::builder()
103        .dns_resolver(resolver)
104        .build()
105        .map(|c| HttpEnricherClient(Arc::new(c)))
106        .map_err(|e| format!("reqwest client build failed: {e}"))
107}
108
109impl HttpEnricher {
110    /// Build a new enricher.
111    ///
112    /// `client` is shared at the process level. `cache` may be a
113    /// disabled cache (`HttpResponseCache::new(Duration::from_secs(0))`)
114    /// when `cache_ttl` is unset; the lookup path treats that as "always
115    /// miss".
116    #[allow(clippy::too_many_arguments)]
117    pub fn new(
118        id: String,
119        kind: EnricherKind,
120        inject_field: String,
121        method: String,
122        url: String,
123        headers: Vec<(String, String)>,
124        body: Option<String>,
125        timeout: Duration,
126        on_error: OnError,
127        scope: Scope,
128        extract: Option<ExtractExpr>,
129        client: HttpEnricherClient,
130        cache: HttpResponseCache,
131    ) -> Self {
132        Self {
133            id,
134            kind,
135            inject_field,
136            method: method.to_ascii_uppercase(),
137            url,
138            headers,
139            body,
140            timeout,
141            on_error,
142            scope,
143            extract,
144            client,
145            cache,
146            metrics: Arc::new(NoopMetrics),
147            max_response_bytes: DEFAULT_ENRICHER_MAX_RESPONSE_BYTES,
148        }
149    }
150
151    /// Override the maximum response-body size this enricher will read.
152    ///
153    /// The default is [`DEFAULT_ENRICHER_MAX_RESPONSE_BYTES`]. Setting a
154    /// smaller value can help when consuming many small enrichment
155    /// payloads concurrently and tightening the per-call memory bound is
156    /// worth the rejection risk on the occasional larger response.
157    pub fn with_max_response_bytes(mut self, max_bytes: usize) -> Self {
158        self.max_response_bytes = max_bytes;
159        self
160    }
161
162    /// Replace the metrics hook this enricher reports cache events into.
163    ///
164    /// Pre-registers the three HTTP-cache counter label sets for this
165    /// enricher's `id` so `rsigma_enrichment_http_cache_{hits,misses,
166    /// expirations}_total{...}` are emitted on `/metrics` from the
167    /// first scrape, even before the enricher has run.
168    pub fn with_metrics(mut self, metrics: Arc<dyn MetricsHook>) -> Self {
169        metrics.register_http_enricher_cache(&self.id);
170        self.metrics = metrics;
171        self
172    }
173
174    /// Read-only view of the response cache. Used by the metrics layer
175    /// to expose cache hit/miss/expiration counters.
176    pub fn cache(&self) -> &HttpResponseCache {
177        &self.cache
178    }
179}
180
181#[async_trait]
182impl Enricher for HttpEnricher {
183    fn kind(&self) -> EnricherKind {
184        self.kind
185    }
186    fn id(&self) -> &str {
187        &self.id
188    }
189    fn inject_field(&self) -> &str {
190        &self.inject_field
191    }
192    fn timeout(&self) -> Duration {
193        self.timeout
194    }
195    fn scope(&self) -> &Scope {
196        &self.scope
197    }
198    fn on_error(&self) -> OnError {
199        self.on_error
200    }
201
202    async fn enrich(&self, result: &mut EvaluationResult) -> Result<(), EnrichError> {
203        let url = render_template(&self.url, result);
204        let body = self.body.as_ref().map(|b| render_template(b, result));
205
206        let cache_key = CacheKey::new(&self.method, &url, body.as_deref().map(str::as_bytes));
207        let (outcome, cached) = self.cache.lookup(&cache_key);
208        match outcome {
209            CacheOutcome::Hit => self.metrics.on_enrichment_http_cache_hit(&self.id),
210            CacheOutcome::Miss => self.metrics.on_enrichment_http_cache_miss(&self.id),
211            CacheOutcome::Expired => {
212                self.metrics.on_enrichment_http_cache_expiration(&self.id);
213                self.metrics.on_enrichment_http_cache_miss(&self.id);
214            }
215        }
216        if let Some(cached_value) = cached {
217            let extracted = self.maybe_extract(&cached_value)?;
218            inject_enrichment(result, &self.inject_field, extracted);
219            return Ok(());
220        }
221
222        let mut header_map = HeaderMap::with_capacity(self.headers.len());
223        for (name, value_template) in &self.headers {
224            let rendered = render_template(value_template, result);
225            let header_name = HeaderName::from_bytes(name.as_bytes()).map_err(|e| EnrichError {
226                enricher_id: self.id.clone(),
227                kind: EnrichErrorKind::Fetch(format!("invalid header name '{name}': {e}")),
228            })?;
229            let header_value = HeaderValue::from_str(&rendered).map_err(|e| EnrichError {
230                enricher_id: self.id.clone(),
231                kind: EnrichErrorKind::Fetch(format!("invalid header value for '{name}': {e}")),
232            })?;
233            header_map.insert(header_name, header_value);
234        }
235
236        let method =
237            reqwest::Method::from_bytes(self.method.as_bytes()).map_err(|e| EnrichError {
238                enricher_id: self.id.clone(),
239                kind: EnrichErrorKind::Fetch(format!("invalid method '{}': {e}", self.method)),
240            })?;
241
242        let mut req = self
243            .client
244            .inner()
245            .request(method, &url)
246            .headers(header_map);
247        if let Some(b) = &body {
248            req = req.body(b.clone());
249        }
250        let resp = req.send().await.map_err(|e| EnrichError {
251            enricher_id: self.id.clone(),
252            kind: if e.is_timeout() {
253                EnrichErrorKind::Timeout
254            } else {
255                EnrichErrorKind::Fetch(format!("{e}"))
256            },
257        })?;
258
259        let status = resp.status();
260        if !status.is_success() {
261            return Err(EnrichError {
262                enricher_id: self.id.clone(),
263                kind: EnrichErrorKind::Fetch(format!("HTTP {status}")),
264            });
265        }
266
267        // Reject the response up-front if the upstream advertises a body
268        // size beyond the cap; otherwise stream chunks and bail as soon
269        // as the accumulated size crosses the cap.
270        if let Some(content_length) = resp.content_length()
271            && content_length as usize > self.max_response_bytes
272        {
273            return Err(EnrichError {
274                enricher_id: self.id.clone(),
275                kind: EnrichErrorKind::Fetch(format!(
276                    "HTTP response Content-Length ({content_length} bytes) exceeds {} byte limit",
277                    self.max_response_bytes,
278                )),
279            });
280        }
281        let bytes = read_body_capped(resp, self.max_response_bytes, &self.id).await?;
282        let parsed: Value = serde_json::from_slice(&bytes).map_err(|e| EnrichError {
283            enricher_id: self.id.clone(),
284            kind: EnrichErrorKind::Parse(format!("JSON: {e}")),
285        })?;
286
287        // Cache the parsed value before extract: a different enricher
288        // sharing the same URL with a different `extract` benefits from
289        // the cached upstream JSON.
290        self.cache.insert(cache_key, parsed.clone());
291
292        let extracted = self.maybe_extract(&parsed)?;
293        inject_enrichment(result, &self.inject_field, extracted);
294        Ok(())
295    }
296}
297
298/// Read a `reqwest::Response` body in chunks, bailing as soon as the
299/// accumulated byte count exceeds `max_bytes`. Mirrors the source-side
300/// helper so the daemon's two HTTP surfaces enforce the same hard cap.
301async fn read_body_capped(
302    mut response: reqwest::Response,
303    max_bytes: usize,
304    enricher_id: &str,
305) -> Result<Vec<u8>, EnrichError> {
306    let mut buf: Vec<u8> = Vec::new();
307    while let Some(chunk) = response.chunk().await.map_err(|e| EnrichError {
308        enricher_id: enricher_id.to_string(),
309        kind: EnrichErrorKind::Fetch(format!("body read: {e}")),
310    })? {
311        if buf.len() + chunk.len() > max_bytes {
312            return Err(EnrichError {
313                enricher_id: enricher_id.to_string(),
314                kind: EnrichErrorKind::Fetch(format!(
315                    "HTTP response body exceeds {max_bytes} byte limit"
316                )),
317            });
318        }
319        buf.extend_from_slice(&chunk);
320    }
321    Ok(buf)
322}
323
324impl HttpEnricher {
325    /// Apply the configured `extract` expression to `value`. Returns the
326    /// raw value untouched when no extract is configured.
327    fn maybe_extract(&self, value: &Value) -> Result<Value, EnrichError> {
328        match &self.extract {
329            None => Ok(value.clone()),
330            Some(expr) => {
331                crate::sources::extract::apply_extract(value, expr).map_err(|e| EnrichError {
332                    enricher_id: self.id.clone(),
333                    kind: EnrichErrorKind::Extract(format!("{}", e.kind)),
334                })
335            }
336        }
337    }
338}