rsigma_runtime/sources/
http.rs1use std::collections::HashMap;
4use std::sync::{Arc, OnceLock};
5use std::time::{Duration, Instant};
6
7use rsigma_eval::pipeline::sources::{DataFormat, ExtractExpr};
8
9use super::extract::apply_extract;
10use super::file::parse_data;
11use super::{MAX_SOURCE_RESPONSE_BYTES, ResolvedValue, SourceError, SourceErrorKind};
12
13static DEFAULT_HTTP_SOURCE_CLIENT: OnceLock<Arc<reqwest::Client>> = OnceLock::new();
20
21const DEFAULT_SOURCE_TIMEOUT: Duration = Duration::from_secs(30);
22
23pub fn shared_http_source_client() -> Result<Arc<reqwest::Client>, SourceError> {
35 if let Some(client) = DEFAULT_HTTP_SOURCE_CLIENT.get() {
36 return Ok(client.clone());
37 }
38 let resolver =
39 crate::egress::EgressFilteredResolver::new(crate::egress::default_egress_policy())
40 .into_dns_resolver();
41 let built = reqwest::Client::builder()
42 .dns_resolver(resolver)
43 .build()
44 .map(Arc::new)
45 .map_err(|e| SourceError {
46 source_id: String::new(),
47 kind: SourceErrorKind::Fetch(format!("failed to build HTTP client: {e}")),
48 })?;
49 Ok(DEFAULT_HTTP_SOURCE_CLIENT.get_or_init(|| built).clone())
51}
52
53pub async fn resolve_http(
55 url: &str,
56 method: Option<&str>,
57 headers: &HashMap<String, String>,
58 format: DataFormat,
59 extract_expr: Option<&ExtractExpr>,
60 timeout: Option<Duration>,
61) -> Result<ResolvedValue, SourceError> {
62 resolve_http_with_limit(
63 url,
64 method,
65 headers,
66 format,
67 extract_expr,
68 timeout,
69 MAX_SOURCE_RESPONSE_BYTES,
70 )
71 .await
72}
73
74pub(crate) async fn resolve_http_with_limit(
76 url: &str,
77 method: Option<&str>,
78 headers: &HashMap<String, String>,
79 format: DataFormat,
80 extract_expr: Option<&ExtractExpr>,
81 timeout: Option<Duration>,
82 max_bytes: usize,
83) -> Result<ResolvedValue, SourceError> {
84 let client = shared_http_source_client()?;
85
86 let method_str = method.unwrap_or("GET");
87 let reqwest_method = method_str
88 .parse::<reqwest::Method>()
89 .map_err(|e| SourceError {
90 source_id: String::new(),
91 kind: SourceErrorKind::Fetch(format!("invalid HTTP method '{method_str}': {e}")),
92 })?;
93
94 let mut request = client
95 .request(reqwest_method, url)
96 .timeout(timeout.unwrap_or(DEFAULT_SOURCE_TIMEOUT));
97
98 for (key, value) in headers {
99 let expanded_value = expand_env_vars(value);
100 request = request.header(key.as_str(), expanded_value);
101 }
102
103 let response = request.send().await.map_err(|e| {
104 if e.is_timeout() {
105 SourceError {
106 source_id: String::new(),
107 kind: SourceErrorKind::Timeout,
108 }
109 } else {
110 SourceError {
111 source_id: String::new(),
112 kind: SourceErrorKind::Fetch(format!("HTTP request failed: {e}")),
113 }
114 }
115 })?;
116
117 let status = response.status();
118 if !status.is_success() {
119 let body = read_body_capped(response, max_bytes)
120 .await
121 .unwrap_or_default();
122 return Err(SourceError {
123 source_id: String::new(),
124 kind: SourceErrorKind::Fetch(format!("HTTP {status}: {}", body.trim())),
125 });
126 }
127
128 if let Some(content_length) = response.content_length()
129 && content_length as usize > max_bytes
130 {
131 return Err(SourceError {
132 source_id: String::new(),
133 kind: SourceErrorKind::ResourceLimit(format!(
134 "HTTP response Content-Length ({content_length} bytes) exceeds {max_bytes} byte limit"
135 )),
136 });
137 }
138
139 let body = read_body_capped(response, max_bytes).await?;
140
141 let parsed = parse_data(&body, format)?;
142
143 let data = if let Some(expr) = extract_expr {
144 apply_extract(&parsed, expr)?
145 } else {
146 parsed
147 };
148
149 Ok(ResolvedValue {
150 data,
151 resolved_at: Instant::now(),
152 from_cache: false,
153 })
154}
155
156async fn read_body_capped(
158 mut response: reqwest::Response,
159 max_bytes: usize,
160) -> Result<String, SourceError> {
161 let mut buf = Vec::new();
162 while let Some(chunk) = response.chunk().await.map_err(|e| SourceError {
163 source_id: String::new(),
164 kind: SourceErrorKind::Fetch(format!("failed to read response chunk: {e}")),
165 })? {
166 if buf.len() + chunk.len() > max_bytes {
167 return Err(SourceError {
168 source_id: String::new(),
169 kind: SourceErrorKind::ResourceLimit(format!(
170 "HTTP response body exceeds {max_bytes} byte limit"
171 )),
172 });
173 }
174 buf.extend_from_slice(&chunk);
175 }
176 String::from_utf8(buf).map_err(|e| SourceError {
177 source_id: String::new(),
178 kind: SourceErrorKind::Parse(format!("response body is not valid UTF-8: {e}")),
179 })
180}
181
182fn expand_env_vars(s: &str) -> String {
184 let re = regex::Regex::new(r"\$\{([A-Z_][A-Z0-9_]*)\}").unwrap();
185 re.replace_all(s, |caps: ®ex::Captures| {
186 let var_name = caps.get(1).unwrap().as_str();
187 std::env::var(var_name).unwrap_or_default()
188 })
189 .to_string()
190}
191
192#[cfg(test)]
193mod tests {
194 use super::*;
195
196 #[test]
197 fn shared_http_source_client_is_arc_stable() {
198 let a = shared_http_source_client().expect("first build");
201 let b = shared_http_source_client().expect("second call");
202 assert!(
203 Arc::ptr_eq(&a, &b),
204 "shared HTTP source client must be process-wide stable"
205 );
206 }
207}