1pub mod cache;
8pub mod command;
9pub mod extract;
10pub mod file;
11pub mod http;
12pub mod include;
13#[cfg(feature = "nats")]
14pub mod nats;
15pub mod refresh;
16pub mod template;
17
18use std::time::{Duration, Instant};
19
20use rsigma_eval::pipeline::sources::{DynamicSource, ErrorPolicy, SourceType};
21
22pub const MAX_SOURCE_RESPONSE_BYTES: usize = 10 * 1024 * 1024; pub const MIN_REFRESH_INTERVAL: Duration = Duration::from_secs(1);
27
28pub use cache::SourceCache;
29pub use template::TemplateExpander;
30
31#[derive(Debug, Clone)]
33pub struct ResolvedValue {
34 pub data: serde_json::Value,
36 pub resolved_at: Instant,
38 pub from_cache: bool,
40}
41
42#[derive(Debug, Clone)]
44pub struct SourceError {
45 pub source_id: String,
47 pub kind: SourceErrorKind,
49}
50
51impl std::fmt::Display for SourceError {
52 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53 write!(f, "source '{}': {}", self.source_id, self.kind)
54 }
55}
56
57impl std::error::Error for SourceError {}
58
59#[derive(Debug, Clone)]
61pub enum SourceErrorKind {
62 Fetch(String),
64 Parse(String),
66 Extract(String),
68 Timeout,
70 ResourceLimit(String),
72}
73
74impl std::fmt::Display for SourceErrorKind {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 match self {
77 Self::Fetch(msg) => write!(f, "fetch failed: {msg}"),
78 Self::Parse(msg) => write!(f, "parse failed: {msg}"),
79 Self::Extract(msg) => write!(f, "extract failed: {msg}"),
80 Self::Timeout => write!(f, "timed out"),
81 Self::ResourceLimit(msg) => write!(f, "resource limit exceeded: {msg}"),
82 }
83 }
84}
85
86#[async_trait::async_trait]
91pub trait SourceResolver: Send + Sync {
92 async fn resolve(&self, source: &DynamicSource) -> Result<ResolvedValue, SourceError>;
94}
95
96pub struct DefaultSourceResolver {
98 cache: SourceCache,
99}
100
101impl DefaultSourceResolver {
102 pub fn new() -> Self {
104 Self {
105 cache: SourceCache::new(),
106 }
107 }
108
109 pub fn with_cache(cache: SourceCache) -> Self {
111 Self { cache }
112 }
113
114 pub fn cache(&self) -> &SourceCache {
116 &self.cache
117 }
118}
119
120impl Default for DefaultSourceResolver {
121 fn default() -> Self {
122 Self::new()
123 }
124}
125
126#[async_trait::async_trait]
127impl SourceResolver for DefaultSourceResolver {
128 async fn resolve(&self, source: &DynamicSource) -> Result<ResolvedValue, SourceError> {
129 let result = match &source.source_type {
130 SourceType::File {
131 path,
132 format,
133 extract,
134 } => file::resolve_file(path, *format, extract.as_ref()).await,
135 SourceType::Command {
136 command,
137 format,
138 extract,
139 } => command::resolve_command(command, *format, extract.as_ref(), source.timeout).await,
140 SourceType::Http {
141 url,
142 method,
143 headers,
144 format,
145 extract,
146 } => {
147 http::resolve_http(
148 url,
149 method.as_deref(),
150 headers,
151 *format,
152 extract.as_ref(),
153 source.timeout,
154 )
155 .await
156 }
157 #[cfg(feature = "nats")]
158 SourceType::Nats {
159 url,
160 subject,
161 format,
162 extract,
163 } => nats::resolve_nats_initial(url, subject, *format, extract.as_ref()).await,
164 #[cfg(not(feature = "nats"))]
165 SourceType::Nats { .. } => {
166 return Err(SourceError {
167 source_id: source.id.clone(),
168 kind: SourceErrorKind::Fetch("NATS source requires the 'nats' feature".into()),
169 });
170 }
171 };
172
173 match result {
174 Ok(value) => {
175 tracing::debug!(source_id = %source.id, "Source fetched successfully");
176 self.cache.store(&source.id, &value.data);
177 Ok(value)
178 }
179 Err(mut err) => {
180 err.source_id = source.id.clone();
181 match source.on_error {
182 ErrorPolicy::UseCached => {
183 if let Some(cached) = self.cache.get(&source.id) {
184 tracing::warn!(
185 source_id = %source.id,
186 error = %err,
187 "Source resolution failed, using cached value"
188 );
189 Ok(ResolvedValue {
190 data: cached,
191 resolved_at: Instant::now(),
192 from_cache: true,
193 })
194 } else {
195 Err(err)
196 }
197 }
198 ErrorPolicy::UseDefault => {
199 if let Some(default) = &source.default {
200 tracing::warn!(
201 source_id = %source.id,
202 error = %err,
203 "Source resolution failed, using default value"
204 );
205 let json_default = yaml_value_to_json(default);
206 Ok(ResolvedValue {
207 data: json_default,
208 resolved_at: Instant::now(),
209 from_cache: false,
210 })
211 } else {
212 Err(err)
213 }
214 }
215 ErrorPolicy::Fail => Err(err),
216 }
217 }
218 }
219 }
220}
221
222pub async fn resolve_all(
229 resolver: &dyn SourceResolver,
230 sources: &[DynamicSource],
231) -> Result<std::collections::HashMap<String, serde_json::Value>, SourceError> {
232 resolve_all_with_state(resolver, sources, None).await
233}
234
235pub async fn resolve_all_with_state(
237 resolver: &dyn SourceResolver,
238 sources: &[DynamicSource],
239 mut state: Option<&mut rsigma_eval::pipeline::state::PipelineState>,
240) -> Result<std::collections::HashMap<String, serde_json::Value>, SourceError> {
241 let mut resolved = std::collections::HashMap::new();
242 for source in sources {
243 match resolver.resolve(source).await {
244 Ok(value) => {
245 resolved.insert(source.id.clone(), value.data);
246 if let Some(s) = state.as_deref_mut() {
247 s.mark_source_resolved(&source.id);
248 }
249 }
250 Err(e) => {
251 if let Some(s) = state.as_deref_mut() {
252 s.mark_source_failed(&source.id);
253 }
254 if source.required {
255 return Err(e);
256 }
257 tracing::warn!(
258 source_id = %source.id,
259 error = %e,
260 "Optional source resolution failed, using null"
261 );
262 resolved.insert(source.id.clone(), serde_json::Value::Null);
263 }
264 }
265 }
266 Ok(resolved)
267}
268
269pub fn yaml_value_to_json(yaml: &yaml_serde::Value) -> serde_json::Value {
271 match yaml {
272 yaml_serde::Value::Null => serde_json::Value::Null,
273 yaml_serde::Value::Bool(b) => serde_json::Value::Bool(*b),
274 yaml_serde::Value::Number(n) => {
275 if let Some(i) = n.as_i64() {
276 serde_json::Value::Number(i.into())
277 } else if let Some(u) = n.as_u64() {
278 serde_json::Value::Number(u.into())
279 } else if let Some(f) = n.as_f64() {
280 serde_json::json!(f)
281 } else {
282 serde_json::Value::Null
283 }
284 }
285 yaml_serde::Value::String(s) => serde_json::Value::String(s.clone()),
286 yaml_serde::Value::Sequence(seq) => {
287 serde_json::Value::Array(seq.iter().map(yaml_value_to_json).collect())
288 }
289 yaml_serde::Value::Mapping(map) => {
290 let obj = map
291 .iter()
292 .map(|(k, v)| {
293 let key = match k {
294 yaml_serde::Value::String(s) => s.clone(),
295 other => format!("{other:?}"),
296 };
297 (key, yaml_value_to_json(v))
298 })
299 .collect();
300 serde_json::Value::Object(obj)
301 }
302 yaml_serde::Value::Tagged(tagged) => yaml_value_to_json(&tagged.value),
303 }
304}