Skip to main content

rsigma_runtime/enrichment/
http.rs

1//! `HttpEnricher`: per-result HTTP fetch with optional response cache.
2//!
3//! Builds a [`reqwest::Request`] from a template-expanded URL, optional
4//! template-expanded headers, and an optional template-expanded body.
5//! The response is parsed as JSON; if an `extract` expression is set,
6//! it is applied via [`crate::sources::extract::apply_extract`] using
7//! the existing dynamic-pipelines extractor stack (jq / jsonpath / cel)
8//! so operators learn one mental model, not two.
9//!
10//! Optional in-memory response cache via [`super::http_cache::HttpResponseCache`]
11//! keyed on `(method, url, body_hash)` with configurable TTL. Mandatory
12//! in practice for rate-limited APIs (VirusTotal: 4 req/min free tier).
13
14use std::sync::Arc;
15use std::time::Duration;
16
17use async_trait::async_trait;
18use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
19use rsigma_eval::EvaluationResult;
20use rsigma_eval::pipeline::sources::ExtractExpr;
21use serde_json::Value;
22
23use super::{
24    EnrichError, EnrichErrorKind, Enricher, EnricherKind, OnError, Scope,
25    http_cache::{CacheKey, CacheOutcome, HttpResponseCache},
26    inject_enrichment,
27    template::render_template,
28};
29use crate::metrics::{MetricsHook, NoopMetrics};
30
31/// One HTTP enricher instance.
32///
33/// Constructed by the daemon config loader. The `Arc<reqwest::Client>`
34/// is shared across all HTTP enrichers in the same daemon process so
35/// connection pooling works at the process level rather than per-config-block.
36pub struct HttpEnricher {
37    id: String,
38    kind: EnricherKind,
39    inject_field: String,
40    method: String,
41    url: String,
42    headers: Vec<(String, String)>,
43    body: Option<String>,
44    timeout: Duration,
45    on_error: OnError,
46    scope: Scope,
47    extract: Option<ExtractExpr>,
48    client: HttpEnricherClient,
49    cache: HttpResponseCache,
50    metrics: Arc<dyn MetricsHook>,
51}
52
53/// Opaque handle around a process-wide `reqwest::Client`. Constructed by
54/// [`build_default_http_client`] and passed by [`Arc`] to every
55/// [`HttpEnricher`] so connection pooling works at the daemon level.
56///
57/// Wrapping the `reqwest::Client` keeps `reqwest` an internal dependency
58/// of `rsigma-runtime` so consumer crates (e.g. `rsigma-cli`) do not
59/// need a direct dep just to wire enrichers together.
60#[derive(Clone)]
61pub struct HttpEnricherClient(Arc<reqwest::Client>);
62
63impl HttpEnricherClient {
64    /// Wrap an existing `reqwest::Client`. Useful for tests that want to
65    /// stub out the client (e.g. a `wiremock`-backed tower stack).
66    pub fn from_reqwest(client: Arc<reqwest::Client>) -> Self {
67        Self(client)
68    }
69    /// Access the inner client. Crate-private so external code goes
70    /// through the wrapper.
71    fn inner(&self) -> &reqwest::Client {
72        &self.0
73    }
74}
75
76/// Build the default shared HTTP client used by the daemon's enrichment
77/// pipeline. All HTTP enrichers in the same daemon process share one
78/// client to amortize TLS handshakes and DNS resolution.
79pub fn build_default_http_client() -> Result<HttpEnricherClient, String> {
80    reqwest::Client::builder()
81        .build()
82        .map(|c| HttpEnricherClient(Arc::new(c)))
83        .map_err(|e| format!("reqwest client build failed: {e}"))
84}
85
86impl HttpEnricher {
87    /// Build a new enricher.
88    ///
89    /// `client` is shared at the process level. `cache` may be a
90    /// disabled cache ([`HttpResponseCache::new(Duration::from_secs(0))`])
91    /// when `cache_ttl` is unset; the lookup path treats that as "always
92    /// miss".
93    #[allow(clippy::too_many_arguments)]
94    pub fn new(
95        id: String,
96        kind: EnricherKind,
97        inject_field: String,
98        method: String,
99        url: String,
100        headers: Vec<(String, String)>,
101        body: Option<String>,
102        timeout: Duration,
103        on_error: OnError,
104        scope: Scope,
105        extract: Option<ExtractExpr>,
106        client: HttpEnricherClient,
107        cache: HttpResponseCache,
108    ) -> Self {
109        Self {
110            id,
111            kind,
112            inject_field,
113            method: method.to_ascii_uppercase(),
114            url,
115            headers,
116            body,
117            timeout,
118            on_error,
119            scope,
120            extract,
121            client,
122            cache,
123            metrics: Arc::new(NoopMetrics),
124        }
125    }
126
127    /// Replace the metrics hook this enricher reports cache events into.
128    ///
129    /// Pre-registers the three HTTP-cache counter label sets for this
130    /// enricher's `id` so `rsigma_enrichment_http_cache_{hits,misses,
131    /// expirations}_total{...}` are emitted on `/metrics` from the
132    /// first scrape, even before the enricher has run.
133    pub fn with_metrics(mut self, metrics: Arc<dyn MetricsHook>) -> Self {
134        metrics.register_http_enricher_cache(&self.id);
135        self.metrics = metrics;
136        self
137    }
138
139    /// Read-only view of the response cache. Used by the metrics layer
140    /// to expose cache hit/miss/expiration counters.
141    pub fn cache(&self) -> &HttpResponseCache {
142        &self.cache
143    }
144}
145
146#[async_trait]
147impl Enricher for HttpEnricher {
148    fn kind(&self) -> EnricherKind {
149        self.kind
150    }
151    fn id(&self) -> &str {
152        &self.id
153    }
154    fn inject_field(&self) -> &str {
155        &self.inject_field
156    }
157    fn timeout(&self) -> Duration {
158        self.timeout
159    }
160    fn scope(&self) -> &Scope {
161        &self.scope
162    }
163    fn on_error(&self) -> OnError {
164        self.on_error
165    }
166
167    async fn enrich(&self, result: &mut EvaluationResult) -> Result<(), EnrichError> {
168        let url = render_template(&self.url, result);
169        let body = self.body.as_ref().map(|b| render_template(b, result));
170
171        let cache_key = CacheKey::new(&self.method, &url, body.as_deref().map(str::as_bytes));
172        let (outcome, cached) = self.cache.lookup(&cache_key);
173        match outcome {
174            CacheOutcome::Hit => self.metrics.on_enrichment_http_cache_hit(&self.id),
175            CacheOutcome::Miss => self.metrics.on_enrichment_http_cache_miss(&self.id),
176            CacheOutcome::Expired => {
177                self.metrics.on_enrichment_http_cache_expiration(&self.id);
178                self.metrics.on_enrichment_http_cache_miss(&self.id);
179            }
180        }
181        if let Some(cached_value) = cached {
182            let extracted = self.maybe_extract(&cached_value)?;
183            inject_enrichment(result, &self.inject_field, extracted);
184            return Ok(());
185        }
186
187        let mut header_map = HeaderMap::with_capacity(self.headers.len());
188        for (name, value_template) in &self.headers {
189            let rendered = render_template(value_template, result);
190            let header_name = HeaderName::from_bytes(name.as_bytes()).map_err(|e| EnrichError {
191                enricher_id: self.id.clone(),
192                kind: EnrichErrorKind::Fetch(format!("invalid header name '{name}': {e}")),
193            })?;
194            let header_value = HeaderValue::from_str(&rendered).map_err(|e| EnrichError {
195                enricher_id: self.id.clone(),
196                kind: EnrichErrorKind::Fetch(format!("invalid header value for '{name}': {e}")),
197            })?;
198            header_map.insert(header_name, header_value);
199        }
200
201        let method =
202            reqwest::Method::from_bytes(self.method.as_bytes()).map_err(|e| EnrichError {
203                enricher_id: self.id.clone(),
204                kind: EnrichErrorKind::Fetch(format!("invalid method '{}': {e}", self.method)),
205            })?;
206
207        let mut req = self
208            .client
209            .inner()
210            .request(method, &url)
211            .headers(header_map);
212        if let Some(b) = &body {
213            req = req.body(b.clone());
214        }
215        let resp = req.send().await.map_err(|e| EnrichError {
216            enricher_id: self.id.clone(),
217            kind: if e.is_timeout() {
218                EnrichErrorKind::Timeout
219            } else {
220                EnrichErrorKind::Fetch(format!("{e}"))
221            },
222        })?;
223
224        let status = resp.status();
225        if !status.is_success() {
226            return Err(EnrichError {
227                enricher_id: self.id.clone(),
228                kind: EnrichErrorKind::Fetch(format!("HTTP {status}")),
229            });
230        }
231
232        let bytes = resp.bytes().await.map_err(|e| EnrichError {
233            enricher_id: self.id.clone(),
234            kind: EnrichErrorKind::Fetch(format!("body read: {e}")),
235        })?;
236        let parsed: Value = serde_json::from_slice(&bytes).map_err(|e| EnrichError {
237            enricher_id: self.id.clone(),
238            kind: EnrichErrorKind::Parse(format!("JSON: {e}")),
239        })?;
240
241        // Cache the parsed value before extract: a different enricher
242        // sharing the same URL with a different `extract` benefits from
243        // the cached upstream JSON.
244        self.cache.insert(cache_key, parsed.clone());
245
246        let extracted = self.maybe_extract(&parsed)?;
247        inject_enrichment(result, &self.inject_field, extracted);
248        Ok(())
249    }
250}
251
252impl HttpEnricher {
253    /// Apply the configured `extract` expression to `value`. Returns the
254    /// raw value untouched when no extract is configured.
255    fn maybe_extract(&self, value: &Value) -> Result<Value, EnrichError> {
256        match &self.extract {
257            None => Ok(value.clone()),
258            Some(expr) => {
259                crate::sources::extract::apply_extract(value, expr).map_err(|e| EnrichError {
260                    enricher_id: self.id.clone(),
261                    kind: EnrichErrorKind::Extract(format!("{}", e.kind)),
262                })
263            }
264        }
265    }
266}