rsigma_runtime/sources/
http.rs1use std::collections::HashMap;
4use std::time::{Duration, Instant};
5
6use rsigma_eval::pipeline::sources::{DataFormat, ExtractExpr};
7
8use super::extract::apply_extract;
9use super::file::parse_data;
10use super::{MAX_SOURCE_RESPONSE_BYTES, ResolvedValue, SourceError, SourceErrorKind};
11
12pub async fn resolve_http(
14 url: &str,
15 method: Option<&str>,
16 headers: &HashMap<String, String>,
17 format: DataFormat,
18 extract_expr: Option<&ExtractExpr>,
19 timeout: Option<Duration>,
20) -> Result<ResolvedValue, SourceError> {
21 resolve_http_with_limit(
22 url,
23 method,
24 headers,
25 format,
26 extract_expr,
27 timeout,
28 MAX_SOURCE_RESPONSE_BYTES,
29 )
30 .await
31}
32
33pub(crate) async fn resolve_http_with_limit(
35 url: &str,
36 method: Option<&str>,
37 headers: &HashMap<String, String>,
38 format: DataFormat,
39 extract_expr: Option<&ExtractExpr>,
40 timeout: Option<Duration>,
41 max_bytes: usize,
42) -> Result<ResolvedValue, SourceError> {
43 let client = reqwest::Client::builder()
44 .timeout(timeout.unwrap_or(Duration::from_secs(30)))
45 .build()
46 .map_err(|e| SourceError {
47 source_id: String::new(),
48 kind: SourceErrorKind::Fetch(format!("failed to build HTTP client: {e}")),
49 })?;
50
51 let method_str = method.unwrap_or("GET");
52 let reqwest_method = method_str
53 .parse::<reqwest::Method>()
54 .map_err(|e| SourceError {
55 source_id: String::new(),
56 kind: SourceErrorKind::Fetch(format!("invalid HTTP method '{method_str}': {e}")),
57 })?;
58
59 let mut request = client.request(reqwest_method, url);
60
61 for (key, value) in headers {
62 let expanded_value = expand_env_vars(value);
63 request = request.header(key.as_str(), expanded_value);
64 }
65
66 let response = request.send().await.map_err(|e| {
67 if e.is_timeout() {
68 SourceError {
69 source_id: String::new(),
70 kind: SourceErrorKind::Timeout,
71 }
72 } else {
73 SourceError {
74 source_id: String::new(),
75 kind: SourceErrorKind::Fetch(format!("HTTP request failed: {e}")),
76 }
77 }
78 })?;
79
80 let status = response.status();
81 if !status.is_success() {
82 let body = read_body_capped(response, max_bytes)
83 .await
84 .unwrap_or_default();
85 return Err(SourceError {
86 source_id: String::new(),
87 kind: SourceErrorKind::Fetch(format!("HTTP {status}: {}", body.trim())),
88 });
89 }
90
91 if let Some(content_length) = response.content_length()
92 && content_length as usize > max_bytes
93 {
94 return Err(SourceError {
95 source_id: String::new(),
96 kind: SourceErrorKind::ResourceLimit(format!(
97 "HTTP response Content-Length ({content_length} bytes) exceeds {} byte limit",
98 max_bytes
99 )),
100 });
101 }
102
103 let body = read_body_capped(response, max_bytes).await?;
104
105 let parsed = parse_data(&body, format)?;
106
107 let data = if let Some(expr) = extract_expr {
108 apply_extract(&parsed, expr)?
109 } else {
110 parsed
111 };
112
113 Ok(ResolvedValue {
114 data,
115 resolved_at: Instant::now(),
116 from_cache: false,
117 })
118}
119
120async fn read_body_capped(
122 mut response: reqwest::Response,
123 max_bytes: usize,
124) -> Result<String, SourceError> {
125 let mut buf = Vec::new();
126 while let Some(chunk) = response.chunk().await.map_err(|e| SourceError {
127 source_id: String::new(),
128 kind: SourceErrorKind::Fetch(format!("failed to read response chunk: {e}")),
129 })? {
130 if buf.len() + chunk.len() > max_bytes {
131 return Err(SourceError {
132 source_id: String::new(),
133 kind: SourceErrorKind::ResourceLimit(format!(
134 "HTTP response body exceeds {} byte limit",
135 max_bytes
136 )),
137 });
138 }
139 buf.extend_from_slice(&chunk);
140 }
141 String::from_utf8(buf).map_err(|e| SourceError {
142 source_id: String::new(),
143 kind: SourceErrorKind::Parse(format!("response body is not valid UTF-8: {e}")),
144 })
145}
146
147fn expand_env_vars(s: &str) -> String {
149 let re = regex::Regex::new(r"\$\{([A-Z_][A-Z0-9_]*)\}").unwrap();
150 re.replace_all(s, |caps: ®ex::Captures| {
151 let var_name = caps.get(1).unwrap().as_str();
152 std::env::var(var_name).unwrap_or_default()
153 })
154 .to_string()
155}