1pub mod cache;
8pub mod command;
9pub mod extract;
10pub mod file;
11pub mod http;
12pub mod include;
13#[cfg(feature = "nats")]
14pub mod nats;
15pub mod refresh;
16pub mod registry;
17pub mod template;
18
19use std::time::{Duration, Instant};
20
21use rsigma_eval::pipeline::sources::{DynamicSource, ErrorPolicy, SourceType};
22
23pub const MAX_SOURCE_RESPONSE_BYTES: usize = 10 * 1024 * 1024; pub const MIN_REFRESH_INTERVAL: Duration = Duration::from_secs(1);
28
29pub use cache::SourceCache;
30pub use template::TemplateExpander;
31
32#[derive(Debug, Clone)]
34pub struct ResolvedValue {
35 pub data: serde_json::Value,
37 pub resolved_at: Instant,
39 pub from_cache: bool,
41}
42
43#[derive(Debug, Clone)]
45pub struct SourceError {
46 pub source_id: String,
48 pub kind: SourceErrorKind,
50}
51
52impl std::fmt::Display for SourceError {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 write!(f, "source '{}': {}", self.source_id, self.kind)
55 }
56}
57
58impl std::error::Error for SourceError {}
59
60#[derive(Debug, Clone)]
62pub enum SourceErrorKind {
63 Fetch(String),
65 Parse(String),
67 Extract(String),
69 Timeout,
71 ResourceLimit(String),
73}
74
75impl std::fmt::Display for SourceErrorKind {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 match self {
78 Self::Fetch(msg) => write!(f, "fetch failed: {msg}"),
79 Self::Parse(msg) => write!(f, "parse failed: {msg}"),
80 Self::Extract(msg) => write!(f, "extract failed: {msg}"),
81 Self::Timeout => write!(f, "timed out"),
82 Self::ResourceLimit(msg) => write!(f, "resource limit exceeded: {msg}"),
83 }
84 }
85}
86
87#[async_trait::async_trait]
92pub trait SourceResolver: Send + Sync {
93 async fn resolve(&self, source: &DynamicSource) -> Result<ResolvedValue, SourceError>;
95}
96
97pub struct DefaultSourceResolver {
99 cache: std::sync::Arc<SourceCache>,
100}
101
102impl DefaultSourceResolver {
103 pub fn new() -> Self {
105 Self {
106 cache: std::sync::Arc::new(SourceCache::new()),
107 }
108 }
109
110 pub fn with_cache(cache: SourceCache) -> Self {
112 Self {
113 cache: std::sync::Arc::new(cache),
114 }
115 }
116
117 pub fn with_arc_cache(cache: std::sync::Arc<SourceCache>) -> Self {
121 Self { cache }
122 }
123
124 pub fn cache(&self) -> &SourceCache {
126 &self.cache
127 }
128
129 pub fn arc_cache(&self) -> std::sync::Arc<SourceCache> {
132 self.cache.clone()
133 }
134}
135
136impl Default for DefaultSourceResolver {
137 fn default() -> Self {
138 Self::new()
139 }
140}
141
142#[async_trait::async_trait]
143impl SourceResolver for DefaultSourceResolver {
144 async fn resolve(&self, source: &DynamicSource) -> Result<ResolvedValue, SourceError> {
145 let result = match &source.source_type {
146 SourceType::File {
147 path,
148 format,
149 extract,
150 } => file::resolve_file(path, *format, extract.as_ref()).await,
151 SourceType::Command {
152 command,
153 format,
154 extract,
155 } => command::resolve_command(command, *format, extract.as_ref(), source.timeout).await,
156 SourceType::Http {
157 url,
158 method,
159 headers,
160 format,
161 extract,
162 } => {
163 http::resolve_http(
164 url,
165 method.as_deref(),
166 headers,
167 *format,
168 extract.as_ref(),
169 source.timeout,
170 )
171 .await
172 }
173 #[cfg(feature = "nats")]
174 SourceType::Nats {
175 url,
176 subject,
177 format,
178 extract,
179 } => nats::resolve_nats_initial(url, subject, *format, extract.as_ref()).await,
180 #[cfg(not(feature = "nats"))]
181 SourceType::Nats { .. } => {
182 return Err(SourceError {
183 source_id: source.id.clone(),
184 kind: SourceErrorKind::Fetch("NATS source requires the 'nats' feature".into()),
185 });
186 }
187 };
188
189 match result {
190 Ok(value) => {
191 tracing::debug!(source_id = %source.id, "Source fetched successfully");
192 self.cache.store(&source.id, &value.data);
193 Ok(value)
194 }
195 Err(mut err) => {
196 err.source_id = source.id.clone();
197 match source.on_error {
198 ErrorPolicy::UseCached => {
199 if let Some(cached) = self.cache.get(&source.id) {
200 tracing::warn!(
201 source_id = %source.id,
202 error = %err,
203 "Source resolution failed, using cached value"
204 );
205 Ok(ResolvedValue {
206 data: cached,
207 resolved_at: Instant::now(),
208 from_cache: true,
209 })
210 } else {
211 Err(err)
212 }
213 }
214 ErrorPolicy::UseDefault => {
215 if let Some(default) = &source.default {
216 tracing::warn!(
217 source_id = %source.id,
218 error = %err,
219 "Source resolution failed, using default value"
220 );
221 let json_default = yaml_value_to_json(default);
222 Ok(ResolvedValue {
223 data: json_default,
224 resolved_at: Instant::now(),
225 from_cache: false,
226 })
227 } else {
228 Err(err)
229 }
230 }
231 ErrorPolicy::Fail => Err(err),
232 }
233 }
234 }
235 }
236}
237
238pub async fn resolve_all(
245 resolver: &dyn SourceResolver,
246 sources: &[DynamicSource],
247) -> Result<std::collections::HashMap<String, serde_json::Value>, SourceError> {
248 resolve_all_with_state(resolver, sources, None).await
249}
250
251pub async fn resolve_all_with_state(
255 resolver: &dyn SourceResolver,
256 sources: &[DynamicSource],
257 mut state: Option<&mut rsigma_eval::pipeline::state::PipelineState>,
258) -> Result<std::collections::HashMap<String, serde_json::Value>, SourceError> {
259 let mut resolved = std::collections::HashMap::new();
260 for source in sources {
261 match resolver.resolve(source).await {
262 Ok(value) => {
263 resolved.insert(source.id.clone(), value.data);
264 if let Some(s) = state.as_deref_mut() {
265 s.mark_source_resolved(&source.id);
266 }
267 }
268 Err(e) => {
269 if let Some(s) = state.as_deref_mut() {
270 s.mark_source_failed(&source.id);
271 }
272 if source.required {
273 return Err(e);
274 }
275 tracing::warn!(
276 source_id = %source.id,
277 error = %e,
278 "Optional source resolution failed, using null"
279 );
280 resolved.insert(source.id.clone(), serde_json::Value::Null);
281 }
282 }
283 }
284 Ok(resolved)
285}
286
287pub fn yaml_value_to_json(yaml: &yaml_serde::Value) -> serde_json::Value {
289 match yaml {
290 yaml_serde::Value::Null => serde_json::Value::Null,
291 yaml_serde::Value::Bool(b) => serde_json::Value::Bool(*b),
292 yaml_serde::Value::Number(n) => {
293 if let Some(i) = n.as_i64() {
294 serde_json::Value::Number(i.into())
295 } else if let Some(u) = n.as_u64() {
296 serde_json::Value::Number(u.into())
297 } else if let Some(f) = n.as_f64() {
298 serde_json::json!(f)
299 } else {
300 serde_json::Value::Null
301 }
302 }
303 yaml_serde::Value::String(s) => serde_json::Value::String(s.clone()),
304 yaml_serde::Value::Sequence(seq) => {
305 serde_json::Value::Array(seq.iter().map(yaml_value_to_json).collect())
306 }
307 yaml_serde::Value::Mapping(map) => {
308 let obj = map
309 .iter()
310 .map(|(k, v)| {
311 let key = match k {
312 yaml_serde::Value::String(s) => s.clone(),
313 other => format!("{other:?}"),
314 };
315 (key, yaml_value_to_json(v))
316 })
317 .collect();
318 serde_json::Value::Object(obj)
319 }
320 yaml_serde::Value::Tagged(tagged) => yaml_value_to_json(&tagged.value),
321 }
322}