Skip to main content

rsigma_runtime/sources/
http.rs

1//! HTTP source resolver: fetches data from HTTP endpoints.
2
3use std::collections::HashMap;
4use std::time::{Duration, Instant};
5
6use rsigma_eval::pipeline::sources::{DataFormat, ExtractExpr};
7
8use super::extract::apply_extract;
9use super::file::parse_data;
10use super::{ResolvedValue, SourceError, SourceErrorKind};
11
12/// Resolve an HTTP source by fetching the URL and parsing the response.
13pub async fn resolve_http(
14    url: &str,
15    method: Option<&str>,
16    headers: &HashMap<String, String>,
17    format: DataFormat,
18    extract_expr: Option<&ExtractExpr>,
19    timeout: Option<Duration>,
20) -> Result<ResolvedValue, SourceError> {
21    let client = reqwest::Client::builder()
22        .timeout(timeout.unwrap_or(Duration::from_secs(30)))
23        .build()
24        .map_err(|e| SourceError {
25            source_id: String::new(),
26            kind: SourceErrorKind::Fetch(format!("failed to build HTTP client: {e}")),
27        })?;
28
29    let method_str = method.unwrap_or("GET");
30    let reqwest_method = method_str
31        .parse::<reqwest::Method>()
32        .map_err(|e| SourceError {
33            source_id: String::new(),
34            kind: SourceErrorKind::Fetch(format!("invalid HTTP method '{method_str}': {e}")),
35        })?;
36
37    let mut request = client.request(reqwest_method, url);
38
39    for (key, value) in headers {
40        let expanded_value = expand_env_vars(value);
41        request = request.header(key.as_str(), expanded_value);
42    }
43
44    let response = request.send().await.map_err(|e| {
45        if e.is_timeout() {
46            SourceError {
47                source_id: String::new(),
48                kind: SourceErrorKind::Timeout,
49            }
50        } else {
51            SourceError {
52                source_id: String::new(),
53                kind: SourceErrorKind::Fetch(format!("HTTP request failed: {e}")),
54            }
55        }
56    })?;
57
58    let status = response.status();
59    if !status.is_success() {
60        let body = response.text().await.unwrap_or_default();
61        return Err(SourceError {
62            source_id: String::new(),
63            kind: SourceErrorKind::Fetch(format!("HTTP {status}: {}", body.trim())),
64        });
65    }
66
67    let body = response.text().await.map_err(|e| SourceError {
68        source_id: String::new(),
69        kind: SourceErrorKind::Fetch(format!("failed to read response body: {e}")),
70    })?;
71
72    let parsed = parse_data(&body, format)?;
73
74    let data = if let Some(expr) = extract_expr {
75        apply_extract(&parsed, expr)?
76    } else {
77        parsed
78    };
79
80    Ok(ResolvedValue {
81        data,
82        resolved_at: Instant::now(),
83        from_cache: false,
84    })
85}
86
87/// Expand `${ENV_VAR}` references in a string with environment variable values.
88fn expand_env_vars(s: &str) -> String {
89    let re = regex::Regex::new(r"\$\{([A-Z_][A-Z0-9_]*)\}").unwrap();
90    re.replace_all(s, |caps: &regex::Captures| {
91        let var_name = caps.get(1).unwrap().as_str();
92        std::env::var(var_name).unwrap_or_default()
93    })
94    .to_string()
95}