Skip to main content

rsigma_runtime/sources/
mod.rs

1//! Dynamic source resolution for Sigma pipelines.
2//!
3//! This module provides the [`SourceResolver`] trait and a [`DefaultSourceResolver`]
4//! implementation that fetches data from file, command, HTTP, and NATS sources
5//! declared in a pipeline's `sources` section.
6
7pub mod cache;
8pub mod command;
9pub mod extract;
10pub mod file;
11pub mod http;
12pub mod include;
13#[cfg(feature = "nats")]
14pub mod nats;
15pub mod refresh;
16pub mod template;
17
18use std::time::Instant;
19
20use rsigma_eval::pipeline::sources::{DynamicSource, ErrorPolicy, SourceType};
21
22pub use cache::SourceCache;
23pub use template::TemplateExpander;
24
25/// The result of successfully resolving a dynamic source.
26#[derive(Debug, Clone)]
27pub struct ResolvedValue {
28    /// The resolved data as a YAML value (can be scalar, sequence, or mapping).
29    pub data: serde_json::Value,
30    /// When this value was resolved.
31    pub resolved_at: Instant,
32    /// Whether this value was served from cache rather than freshly fetched.
33    pub from_cache: bool,
34}
35
36/// An error that occurred while resolving a dynamic source.
37#[derive(Debug, Clone)]
38pub struct SourceError {
39    /// The source ID that failed.
40    pub source_id: String,
41    /// What went wrong.
42    pub kind: SourceErrorKind,
43}
44
45impl std::fmt::Display for SourceError {
46    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47        write!(f, "source '{}': {}", self.source_id, self.kind)
48    }
49}
50
51impl std::error::Error for SourceError {}
52
53/// The kind of error that occurred during source resolution.
54#[derive(Debug, Clone)]
55pub enum SourceErrorKind {
56    /// Failed to fetch/read the source data.
57    Fetch(String),
58    /// Failed to parse the fetched data into the expected format.
59    Parse(String),
60    /// The `extract` expression failed or returned no data.
61    Extract(String),
62    /// The fetch exceeded the configured timeout.
63    Timeout,
64}
65
66impl std::fmt::Display for SourceErrorKind {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        match self {
69            Self::Fetch(msg) => write!(f, "fetch failed: {msg}"),
70            Self::Parse(msg) => write!(f, "parse failed: {msg}"),
71            Self::Extract(msg) => write!(f, "extract failed: {msg}"),
72            Self::Timeout => write!(f, "timed out"),
73        }
74    }
75}
76
77/// Trait for resolving dynamic pipeline sources.
78///
79/// Implementations fetch data from external sources (files, commands, HTTP, NATS)
80/// and return it as a JSON value that can be injected into the pipeline.
81#[async_trait::async_trait]
82pub trait SourceResolver: Send + Sync {
83    /// Resolve a single dynamic source, returning the fetched data.
84    async fn resolve(&self, source: &DynamicSource) -> Result<ResolvedValue, SourceError>;
85}
86
87/// Default source resolver that dispatches to file, command, and HTTP resolvers.
88pub struct DefaultSourceResolver {
89    cache: SourceCache,
90}
91
92impl DefaultSourceResolver {
93    /// Create a new resolver with an in-memory cache.
94    pub fn new() -> Self {
95        Self {
96            cache: SourceCache::new(),
97        }
98    }
99
100    /// Create a new resolver with the given cache.
101    pub fn with_cache(cache: SourceCache) -> Self {
102        Self { cache }
103    }
104
105    /// Get a reference to the cache (for inspection/testing).
106    pub fn cache(&self) -> &SourceCache {
107        &self.cache
108    }
109}
110
111impl Default for DefaultSourceResolver {
112    fn default() -> Self {
113        Self::new()
114    }
115}
116
117#[async_trait::async_trait]
118impl SourceResolver for DefaultSourceResolver {
119    async fn resolve(&self, source: &DynamicSource) -> Result<ResolvedValue, SourceError> {
120        let result = match &source.source_type {
121            SourceType::File {
122                path,
123                format,
124                extract,
125            } => file::resolve_file(path, *format, extract.as_ref()).await,
126            SourceType::Command {
127                command,
128                format,
129                extract,
130            } => command::resolve_command(command, *format, extract.as_ref()).await,
131            SourceType::Http {
132                url,
133                method,
134                headers,
135                format,
136                extract,
137            } => {
138                http::resolve_http(
139                    url,
140                    method.as_deref(),
141                    headers,
142                    *format,
143                    extract.as_ref(),
144                    source.timeout,
145                )
146                .await
147            }
148            #[cfg(feature = "nats")]
149            SourceType::Nats {
150                url,
151                subject,
152                format,
153                extract,
154            } => nats::resolve_nats_initial(url, subject, *format, extract.as_ref()).await,
155            #[cfg(not(feature = "nats"))]
156            SourceType::Nats { .. } => {
157                return Err(SourceError {
158                    source_id: source.id.clone(),
159                    kind: SourceErrorKind::Fetch("NATS source requires the 'nats' feature".into()),
160                });
161            }
162        };
163
164        match result {
165            Ok(value) => {
166                self.cache.store(&source.id, &value.data);
167                Ok(value)
168            }
169            Err(mut err) => {
170                err.source_id = source.id.clone();
171                match source.on_error {
172                    ErrorPolicy::UseCached => {
173                        if let Some(cached) = self.cache.get(&source.id) {
174                            tracing::warn!(
175                                source_id = %source.id,
176                                error = %err,
177                                "Source resolution failed, using cached value"
178                            );
179                            Ok(ResolvedValue {
180                                data: cached,
181                                resolved_at: Instant::now(),
182                                from_cache: true,
183                            })
184                        } else {
185                            Err(err)
186                        }
187                    }
188                    ErrorPolicy::UseDefault => {
189                        if let Some(default) = &source.default {
190                            tracing::warn!(
191                                source_id = %source.id,
192                                error = %err,
193                                "Source resolution failed, using default value"
194                            );
195                            let json_default = yaml_value_to_json(default);
196                            Ok(ResolvedValue {
197                                data: json_default,
198                                resolved_at: Instant::now(),
199                                from_cache: false,
200                            })
201                        } else {
202                            Err(err)
203                        }
204                    }
205                    ErrorPolicy::Fail => Err(err),
206                }
207            }
208        }
209    }
210}
211
212/// Resolve all sources in a pipeline, returning a map of source_id -> resolved data.
213///
214/// Applies error policies: `use_cached`, `use_default`, or `fail`.
215/// Required sources with `Fail` policy propagate errors immediately.
216/// Optional sources (required=false) that fail are logged and skipped
217/// with a Null fallback value.
218pub async fn resolve_all(
219    resolver: &dyn SourceResolver,
220    sources: &[DynamicSource],
221) -> Result<std::collections::HashMap<String, serde_json::Value>, SourceError> {
222    resolve_all_with_state(resolver, sources, None).await
223}
224
225/// Like [`resolve_all`] but also updates a [`PipelineState`] with source resolution status.
226pub async fn resolve_all_with_state(
227    resolver: &dyn SourceResolver,
228    sources: &[DynamicSource],
229    mut state: Option<&mut rsigma_eval::pipeline::state::PipelineState>,
230) -> Result<std::collections::HashMap<String, serde_json::Value>, SourceError> {
231    let mut resolved = std::collections::HashMap::new();
232    for source in sources {
233        match resolver.resolve(source).await {
234            Ok(value) => {
235                resolved.insert(source.id.clone(), value.data);
236                if let Some(s) = state.as_deref_mut() {
237                    s.mark_source_resolved(&source.id);
238                }
239            }
240            Err(e) => {
241                if let Some(s) = state.as_deref_mut() {
242                    s.mark_source_failed(&source.id);
243                }
244                if source.required {
245                    return Err(e);
246                }
247                tracing::warn!(
248                    source_id = %source.id,
249                    error = %e,
250                    "Optional source resolution failed, using null"
251                );
252                resolved.insert(source.id.clone(), serde_json::Value::Null);
253            }
254        }
255    }
256    Ok(resolved)
257}
258
259/// Convert a `serde_yaml::Value` to a `serde_json::Value`.
260pub fn yaml_value_to_json(yaml: &serde_yaml::Value) -> serde_json::Value {
261    match yaml {
262        serde_yaml::Value::Null => serde_json::Value::Null,
263        serde_yaml::Value::Bool(b) => serde_json::Value::Bool(*b),
264        serde_yaml::Value::Number(n) => {
265            if let Some(i) = n.as_i64() {
266                serde_json::Value::Number(i.into())
267            } else if let Some(u) = n.as_u64() {
268                serde_json::Value::Number(u.into())
269            } else if let Some(f) = n.as_f64() {
270                serde_json::json!(f)
271            } else {
272                serde_json::Value::Null
273            }
274        }
275        serde_yaml::Value::String(s) => serde_json::Value::String(s.clone()),
276        serde_yaml::Value::Sequence(seq) => {
277            serde_json::Value::Array(seq.iter().map(yaml_value_to_json).collect())
278        }
279        serde_yaml::Value::Mapping(map) => {
280            let obj = map
281                .iter()
282                .map(|(k, v)| {
283                    let key = match k {
284                        serde_yaml::Value::String(s) => s.clone(),
285                        other => format!("{other:?}"),
286                    };
287                    (key, yaml_value_to_json(v))
288                })
289                .collect();
290            serde_json::Value::Object(obj)
291        }
292        serde_yaml::Value::Tagged(tagged) => yaml_value_to_json(&tagged.value),
293    }
294}