1pub mod cache;
8pub mod command;
9pub mod extract;
10pub mod file;
11pub mod http;
12pub mod include;
13#[cfg(feature = "nats")]
14pub mod nats;
15pub mod refresh;
16pub mod template;
17
18use std::time::Instant;
19
20use rsigma_eval::pipeline::sources::{DynamicSource, ErrorPolicy, SourceType};
21
22pub use cache::SourceCache;
23pub use template::TemplateExpander;
24
25#[derive(Debug, Clone)]
27pub struct ResolvedValue {
28 pub data: serde_json::Value,
30 pub resolved_at: Instant,
32 pub from_cache: bool,
34}
35
36#[derive(Debug, Clone)]
38pub struct SourceError {
39 pub source_id: String,
41 pub kind: SourceErrorKind,
43}
44
45impl std::fmt::Display for SourceError {
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 write!(f, "source '{}': {}", self.source_id, self.kind)
48 }
49}
50
51impl std::error::Error for SourceError {}
52
53#[derive(Debug, Clone)]
55pub enum SourceErrorKind {
56 Fetch(String),
58 Parse(String),
60 Extract(String),
62 Timeout,
64}
65
66impl std::fmt::Display for SourceErrorKind {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 match self {
69 Self::Fetch(msg) => write!(f, "fetch failed: {msg}"),
70 Self::Parse(msg) => write!(f, "parse failed: {msg}"),
71 Self::Extract(msg) => write!(f, "extract failed: {msg}"),
72 Self::Timeout => write!(f, "timed out"),
73 }
74 }
75}
76
77#[async_trait::async_trait]
82pub trait SourceResolver: Send + Sync {
83 async fn resolve(&self, source: &DynamicSource) -> Result<ResolvedValue, SourceError>;
85}
86
87pub struct DefaultSourceResolver {
89 cache: SourceCache,
90}
91
92impl DefaultSourceResolver {
93 pub fn new() -> Self {
95 Self {
96 cache: SourceCache::new(),
97 }
98 }
99
100 pub fn with_cache(cache: SourceCache) -> Self {
102 Self { cache }
103 }
104
105 pub fn cache(&self) -> &SourceCache {
107 &self.cache
108 }
109}
110
111impl Default for DefaultSourceResolver {
112 fn default() -> Self {
113 Self::new()
114 }
115}
116
117#[async_trait::async_trait]
118impl SourceResolver for DefaultSourceResolver {
119 async fn resolve(&self, source: &DynamicSource) -> Result<ResolvedValue, SourceError> {
120 let result = match &source.source_type {
121 SourceType::File {
122 path,
123 format,
124 extract,
125 } => file::resolve_file(path, *format, extract.as_ref()).await,
126 SourceType::Command {
127 command,
128 format,
129 extract,
130 } => command::resolve_command(command, *format, extract.as_ref()).await,
131 SourceType::Http {
132 url,
133 method,
134 headers,
135 format,
136 extract,
137 } => {
138 http::resolve_http(
139 url,
140 method.as_deref(),
141 headers,
142 *format,
143 extract.as_ref(),
144 source.timeout,
145 )
146 .await
147 }
148 #[cfg(feature = "nats")]
149 SourceType::Nats {
150 url,
151 subject,
152 format,
153 extract,
154 } => nats::resolve_nats_initial(url, subject, *format, extract.as_ref()).await,
155 #[cfg(not(feature = "nats"))]
156 SourceType::Nats { .. } => {
157 return Err(SourceError {
158 source_id: source.id.clone(),
159 kind: SourceErrorKind::Fetch("NATS source requires the 'nats' feature".into()),
160 });
161 }
162 };
163
164 match result {
165 Ok(value) => {
166 self.cache.store(&source.id, &value.data);
167 Ok(value)
168 }
169 Err(mut err) => {
170 err.source_id = source.id.clone();
171 match source.on_error {
172 ErrorPolicy::UseCached => {
173 if let Some(cached) = self.cache.get(&source.id) {
174 tracing::warn!(
175 source_id = %source.id,
176 error = %err,
177 "Source resolution failed, using cached value"
178 );
179 Ok(ResolvedValue {
180 data: cached,
181 resolved_at: Instant::now(),
182 from_cache: true,
183 })
184 } else {
185 Err(err)
186 }
187 }
188 ErrorPolicy::UseDefault => {
189 if let Some(default) = &source.default {
190 tracing::warn!(
191 source_id = %source.id,
192 error = %err,
193 "Source resolution failed, using default value"
194 );
195 let json_default = yaml_value_to_json(default);
196 Ok(ResolvedValue {
197 data: json_default,
198 resolved_at: Instant::now(),
199 from_cache: false,
200 })
201 } else {
202 Err(err)
203 }
204 }
205 ErrorPolicy::Fail => Err(err),
206 }
207 }
208 }
209 }
210}
211
212pub async fn resolve_all(
219 resolver: &dyn SourceResolver,
220 sources: &[DynamicSource],
221) -> Result<std::collections::HashMap<String, serde_json::Value>, SourceError> {
222 resolve_all_with_state(resolver, sources, None).await
223}
224
225pub async fn resolve_all_with_state(
227 resolver: &dyn SourceResolver,
228 sources: &[DynamicSource],
229 mut state: Option<&mut rsigma_eval::pipeline::state::PipelineState>,
230) -> Result<std::collections::HashMap<String, serde_json::Value>, SourceError> {
231 let mut resolved = std::collections::HashMap::new();
232 for source in sources {
233 match resolver.resolve(source).await {
234 Ok(value) => {
235 resolved.insert(source.id.clone(), value.data);
236 if let Some(s) = state.as_deref_mut() {
237 s.mark_source_resolved(&source.id);
238 }
239 }
240 Err(e) => {
241 if let Some(s) = state.as_deref_mut() {
242 s.mark_source_failed(&source.id);
243 }
244 if source.required {
245 return Err(e);
246 }
247 tracing::warn!(
248 source_id = %source.id,
249 error = %e,
250 "Optional source resolution failed, using null"
251 );
252 resolved.insert(source.id.clone(), serde_json::Value::Null);
253 }
254 }
255 }
256 Ok(resolved)
257}
258
259pub fn yaml_value_to_json(yaml: &serde_yaml::Value) -> serde_json::Value {
261 match yaml {
262 serde_yaml::Value::Null => serde_json::Value::Null,
263 serde_yaml::Value::Bool(b) => serde_json::Value::Bool(*b),
264 serde_yaml::Value::Number(n) => {
265 if let Some(i) = n.as_i64() {
266 serde_json::Value::Number(i.into())
267 } else if let Some(u) = n.as_u64() {
268 serde_json::Value::Number(u.into())
269 } else if let Some(f) = n.as_f64() {
270 serde_json::json!(f)
271 } else {
272 serde_json::Value::Null
273 }
274 }
275 serde_yaml::Value::String(s) => serde_json::Value::String(s.clone()),
276 serde_yaml::Value::Sequence(seq) => {
277 serde_json::Value::Array(seq.iter().map(yaml_value_to_json).collect())
278 }
279 serde_yaml::Value::Mapping(map) => {
280 let obj = map
281 .iter()
282 .map(|(k, v)| {
283 let key = match k {
284 serde_yaml::Value::String(s) => s.clone(),
285 other => format!("{other:?}"),
286 };
287 (key, yaml_value_to_json(v))
288 })
289 .collect();
290 serde_json::Value::Object(obj)
291 }
292 serde_yaml::Value::Tagged(tagged) => yaml_value_to_json(&tagged.value),
293 }
294}