Skip to main content

rsigma_runtime/sources/
http.rs

1//! HTTP source resolver: fetches data from HTTP endpoints.
2
3use std::collections::HashMap;
4use std::sync::{Arc, OnceLock};
5use std::time::{Duration, Instant};
6
7use rsigma_eval::pipeline::sources::{DataFormat, ExtractExpr};
8
9use super::extract::apply_extract;
10use super::file::parse_data;
11use super::{MAX_SOURCE_RESPONSE_BYTES, ResolvedValue, SourceError, SourceErrorKind};
12
13/// Process-wide shared HTTP client for source fetches. Built lazily on first
14/// use and reused for every subsequent call so refresh storms (e.g. a
15/// dynamic-source pipeline pulling several feeds every 30 seconds) amortize
16/// TLS handshakes, connection pooling, and DNS resolution. Per-call timeouts
17/// are applied via [`reqwest::RequestBuilder::timeout`] so a shared client is
18/// safe even when callers want different timeouts.
19static DEFAULT_HTTP_SOURCE_CLIENT: OnceLock<Arc<reqwest::Client>> = OnceLock::new();
20
21const DEFAULT_SOURCE_TIMEOUT: Duration = Duration::from_secs(30);
22
23/// Return the process-wide HTTP client used by source resolution.
24///
25/// The first call constructs the client; subsequent calls return the same
26/// `Arc<reqwest::Client>`. The client is wired to the
27/// [`default_egress_policy`](crate::egress::default_egress_policy) via a
28/// custom DNS resolver so a Sigma rule that points the daemon at a cloud
29/// metadata endpoint cannot exfiltrate IAM credentials.
30///
31/// Errors building the client are surfaced as `SourceError` rather than
32/// panicking so callers can fail-soft on broken TLS setups instead of
33/// crashing the daemon.
34pub fn shared_http_source_client() -> Result<Arc<reqwest::Client>, SourceError> {
35    if let Some(client) = DEFAULT_HTTP_SOURCE_CLIENT.get() {
36        return Ok(client.clone());
37    }
38    let resolver =
39        crate::egress::EgressFilteredResolver::new(crate::egress::default_egress_policy())
40            .into_dns_resolver();
41    let built = reqwest::Client::builder()
42        .dns_resolver(resolver)
43        .build()
44        .map(Arc::new)
45        .map_err(|e| SourceError {
46            source_id: String::new(),
47            kind: SourceErrorKind::Fetch(format!("failed to build HTTP client: {e}")),
48        })?;
49    // Other threads may have raced us; whichever insert wins is fine.
50    Ok(DEFAULT_HTTP_SOURCE_CLIENT.get_or_init(|| built).clone())
51}
52
53/// Resolve an HTTP source by fetching the URL and parsing the response.
54pub async fn resolve_http(
55    url: &str,
56    method: Option<&str>,
57    headers: &HashMap<String, String>,
58    format: DataFormat,
59    extract_expr: Option<&ExtractExpr>,
60    timeout: Option<Duration>,
61) -> Result<ResolvedValue, SourceError> {
62    resolve_http_with_limit(
63        url,
64        method,
65        headers,
66        format,
67        extract_expr,
68        timeout,
69        MAX_SOURCE_RESPONSE_BYTES,
70    )
71    .await
72}
73
74/// Inner implementation with a configurable size limit (for testing).
75pub(crate) async fn resolve_http_with_limit(
76    url: &str,
77    method: Option<&str>,
78    headers: &HashMap<String, String>,
79    format: DataFormat,
80    extract_expr: Option<&ExtractExpr>,
81    timeout: Option<Duration>,
82    max_bytes: usize,
83) -> Result<ResolvedValue, SourceError> {
84    let client = shared_http_source_client()?;
85
86    let method_str = method.unwrap_or("GET");
87    let reqwest_method = method_str
88        .parse::<reqwest::Method>()
89        .map_err(|e| SourceError {
90            source_id: String::new(),
91            kind: SourceErrorKind::Fetch(format!("invalid HTTP method '{method_str}': {e}")),
92        })?;
93
94    let mut request = client
95        .request(reqwest_method, url)
96        .timeout(timeout.unwrap_or(DEFAULT_SOURCE_TIMEOUT));
97
98    for (key, value) in headers {
99        let expanded_value = expand_env_vars(value);
100        request = request.header(key.as_str(), expanded_value);
101    }
102
103    let response = request.send().await.map_err(|e| {
104        if e.is_timeout() {
105            SourceError {
106                source_id: String::new(),
107                kind: SourceErrorKind::Timeout,
108            }
109        } else {
110            SourceError {
111                source_id: String::new(),
112                kind: SourceErrorKind::Fetch(format!("HTTP request failed: {e}")),
113            }
114        }
115    })?;
116
117    let status = response.status();
118    if !status.is_success() {
119        let body = read_body_capped(response, max_bytes)
120            .await
121            .unwrap_or_default();
122        return Err(SourceError {
123            source_id: String::new(),
124            kind: SourceErrorKind::Fetch(format!("HTTP {status}: {}", body.trim())),
125        });
126    }
127
128    if let Some(content_length) = response.content_length()
129        && content_length as usize > max_bytes
130    {
131        return Err(SourceError {
132            source_id: String::new(),
133            kind: SourceErrorKind::ResourceLimit(format!(
134                "HTTP response Content-Length ({content_length} bytes) exceeds {max_bytes} byte limit"
135            )),
136        });
137    }
138
139    let body = read_body_capped(response, max_bytes).await?;
140
141    let parsed = parse_data(&body, format)?;
142
143    let data = if let Some(expr) = extract_expr {
144        apply_extract(&parsed, expr)?
145    } else {
146        parsed
147    };
148
149    Ok(ResolvedValue {
150        data,
151        resolved_at: Instant::now(),
152        from_cache: false,
153    })
154}
155
156/// Read a response body in chunks, enforcing a maximum byte cap.
157async fn read_body_capped(
158    mut response: reqwest::Response,
159    max_bytes: usize,
160) -> Result<String, SourceError> {
161    let mut buf = Vec::new();
162    while let Some(chunk) = response.chunk().await.map_err(|e| SourceError {
163        source_id: String::new(),
164        kind: SourceErrorKind::Fetch(format!("failed to read response chunk: {e}")),
165    })? {
166        if buf.len() + chunk.len() > max_bytes {
167            return Err(SourceError {
168                source_id: String::new(),
169                kind: SourceErrorKind::ResourceLimit(format!(
170                    "HTTP response body exceeds {max_bytes} byte limit"
171                )),
172            });
173        }
174        buf.extend_from_slice(&chunk);
175    }
176    String::from_utf8(buf).map_err(|e| SourceError {
177        source_id: String::new(),
178        kind: SourceErrorKind::Parse(format!("response body is not valid UTF-8: {e}")),
179    })
180}
181
182/// Expand `${ENV_VAR}` references in a string with environment variable values.
183fn expand_env_vars(s: &str) -> String {
184    let re = regex::Regex::new(r"\$\{([A-Z_][A-Z0-9_]*)\}").unwrap();
185    re.replace_all(s, |caps: &regex::Captures| {
186        let var_name = caps.get(1).unwrap().as_str();
187        std::env::var(var_name).unwrap_or_default()
188    })
189    .to_string()
190}
191
192#[cfg(test)]
193mod tests {
194    use super::*;
195
196    #[test]
197    fn shared_http_source_client_is_arc_stable() {
198        // Two calls return the same underlying Arc so source refresh
199        // storms reuse the same connection pool.
200        let a = shared_http_source_client().expect("first build");
201        let b = shared_http_source_client().expect("second call");
202        assert!(
203            Arc::ptr_eq(&a, &b),
204            "shared HTTP source client must be process-wide stable"
205        );
206    }
207}