Skip to main content

rsigma_runtime/sources/
http.rs

1//! HTTP source resolver: fetches data from HTTP endpoints.
2
3use std::collections::HashMap;
4use std::time::{Duration, Instant};
5
6use rsigma_eval::pipeline::sources::{DataFormat, ExtractExpr};
7
8use super::extract::apply_extract;
9use super::file::parse_data;
10use super::{MAX_SOURCE_RESPONSE_BYTES, ResolvedValue, SourceError, SourceErrorKind};
11
12/// Resolve an HTTP source by fetching the URL and parsing the response.
13pub async fn resolve_http(
14    url: &str,
15    method: Option<&str>,
16    headers: &HashMap<String, String>,
17    format: DataFormat,
18    extract_expr: Option<&ExtractExpr>,
19    timeout: Option<Duration>,
20) -> Result<ResolvedValue, SourceError> {
21    resolve_http_with_limit(
22        url,
23        method,
24        headers,
25        format,
26        extract_expr,
27        timeout,
28        MAX_SOURCE_RESPONSE_BYTES,
29    )
30    .await
31}
32
33/// Inner implementation with a configurable size limit (for testing).
34pub(crate) async fn resolve_http_with_limit(
35    url: &str,
36    method: Option<&str>,
37    headers: &HashMap<String, String>,
38    format: DataFormat,
39    extract_expr: Option<&ExtractExpr>,
40    timeout: Option<Duration>,
41    max_bytes: usize,
42) -> Result<ResolvedValue, SourceError> {
43    let client = reqwest::Client::builder()
44        .timeout(timeout.unwrap_or(Duration::from_secs(30)))
45        .build()
46        .map_err(|e| SourceError {
47            source_id: String::new(),
48            kind: SourceErrorKind::Fetch(format!("failed to build HTTP client: {e}")),
49        })?;
50
51    let method_str = method.unwrap_or("GET");
52    let reqwest_method = method_str
53        .parse::<reqwest::Method>()
54        .map_err(|e| SourceError {
55            source_id: String::new(),
56            kind: SourceErrorKind::Fetch(format!("invalid HTTP method '{method_str}': {e}")),
57        })?;
58
59    let mut request = client.request(reqwest_method, url);
60
61    for (key, value) in headers {
62        let expanded_value = expand_env_vars(value);
63        request = request.header(key.as_str(), expanded_value);
64    }
65
66    let response = request.send().await.map_err(|e| {
67        if e.is_timeout() {
68            SourceError {
69                source_id: String::new(),
70                kind: SourceErrorKind::Timeout,
71            }
72        } else {
73            SourceError {
74                source_id: String::new(),
75                kind: SourceErrorKind::Fetch(format!("HTTP request failed: {e}")),
76            }
77        }
78    })?;
79
80    let status = response.status();
81    if !status.is_success() {
82        let body = read_body_capped(response, max_bytes)
83            .await
84            .unwrap_or_default();
85        return Err(SourceError {
86            source_id: String::new(),
87            kind: SourceErrorKind::Fetch(format!("HTTP {status}: {}", body.trim())),
88        });
89    }
90
91    if let Some(content_length) = response.content_length()
92        && content_length as usize > max_bytes
93    {
94        return Err(SourceError {
95            source_id: String::new(),
96            kind: SourceErrorKind::ResourceLimit(format!(
97                "HTTP response Content-Length ({content_length} bytes) exceeds {} byte limit",
98                max_bytes
99            )),
100        });
101    }
102
103    let body = read_body_capped(response, max_bytes).await?;
104
105    let parsed = parse_data(&body, format)?;
106
107    let data = if let Some(expr) = extract_expr {
108        apply_extract(&parsed, expr)?
109    } else {
110        parsed
111    };
112
113    Ok(ResolvedValue {
114        data,
115        resolved_at: Instant::now(),
116        from_cache: false,
117    })
118}
119
120/// Read a response body in chunks, enforcing a maximum byte cap.
121async fn read_body_capped(
122    mut response: reqwest::Response,
123    max_bytes: usize,
124) -> Result<String, SourceError> {
125    let mut buf = Vec::new();
126    while let Some(chunk) = response.chunk().await.map_err(|e| SourceError {
127        source_id: String::new(),
128        kind: SourceErrorKind::Fetch(format!("failed to read response chunk: {e}")),
129    })? {
130        if buf.len() + chunk.len() > max_bytes {
131            return Err(SourceError {
132                source_id: String::new(),
133                kind: SourceErrorKind::ResourceLimit(format!(
134                    "HTTP response body exceeds {} byte limit",
135                    max_bytes
136                )),
137            });
138        }
139        buf.extend_from_slice(&chunk);
140    }
141    String::from_utf8(buf).map_err(|e| SourceError {
142        source_id: String::new(),
143        kind: SourceErrorKind::Parse(format!("response body is not valid UTF-8: {e}")),
144    })
145}
146
147/// Expand `${ENV_VAR}` references in a string with environment variable values.
148fn expand_env_vars(s: &str) -> String {
149    let re = regex::Regex::new(r"\$\{([A-Z_][A-Z0-9_]*)\}").unwrap();
150    re.replace_all(s, |caps: &regex::Captures| {
151        let var_name = caps.get(1).unwrap().as_str();
152        std::env::var(var_name).unwrap_or_default()
153    })
154    .to_string()
155}