Skip to main content

rsigma_runtime/sources/
mod.rs

1//! Dynamic source resolution for Sigma pipelines.
2//!
3//! This module provides the [`SourceResolver`] trait and a [`DefaultSourceResolver`]
4//! implementation that fetches data from file, command, HTTP, and NATS sources
5//! declared in a pipeline's `sources` section.
6
7pub mod cache;
8pub mod command;
9pub mod extract;
10pub mod file;
11pub mod http;
12pub mod include;
13#[cfg(feature = "nats")]
14pub mod nats;
15pub mod refresh;
16pub mod template;
17
18use std::time::{Duration, Instant};
19
20use rsigma_eval::pipeline::sources::{DynamicSource, ErrorPolicy, SourceType};
21
22/// Maximum size of a source response body (HTTP, command stdout, NATS payload).
23pub const MAX_SOURCE_RESPONSE_BYTES: usize = 10 * 1024 * 1024; // 10 MB
24
25/// Minimum allowed refresh interval to prevent hot CPU loops.
26pub const MIN_REFRESH_INTERVAL: Duration = Duration::from_secs(1);
27
28pub use cache::SourceCache;
29pub use template::TemplateExpander;
30
31/// The result of successfully resolving a dynamic source.
32#[derive(Debug, Clone)]
33pub struct ResolvedValue {
34    /// The resolved data as a YAML value (can be scalar, sequence, or mapping).
35    pub data: serde_json::Value,
36    /// When this value was resolved.
37    pub resolved_at: Instant,
38    /// Whether this value was served from cache rather than freshly fetched.
39    pub from_cache: bool,
40}
41
42/// An error that occurred while resolving a dynamic source.
43#[derive(Debug, Clone)]
44pub struct SourceError {
45    /// The source ID that failed.
46    pub source_id: String,
47    /// What went wrong.
48    pub kind: SourceErrorKind,
49}
50
51impl std::fmt::Display for SourceError {
52    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53        write!(f, "source '{}': {}", self.source_id, self.kind)
54    }
55}
56
57impl std::error::Error for SourceError {}
58
59/// The kind of error that occurred during source resolution.
60#[derive(Debug, Clone)]
61pub enum SourceErrorKind {
62    /// Failed to fetch/read the source data.
63    Fetch(String),
64    /// Failed to parse the fetched data into the expected format.
65    Parse(String),
66    /// The `extract` expression failed or returned no data.
67    Extract(String),
68    /// The fetch exceeded the configured timeout.
69    Timeout,
70    /// The response exceeded the maximum allowed size.
71    ResourceLimit(String),
72}
73
74impl std::fmt::Display for SourceErrorKind {
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        match self {
77            Self::Fetch(msg) => write!(f, "fetch failed: {msg}"),
78            Self::Parse(msg) => write!(f, "parse failed: {msg}"),
79            Self::Extract(msg) => write!(f, "extract failed: {msg}"),
80            Self::Timeout => write!(f, "timed out"),
81            Self::ResourceLimit(msg) => write!(f, "resource limit exceeded: {msg}"),
82        }
83    }
84}
85
86/// Trait for resolving dynamic pipeline sources.
87///
88/// Implementations fetch data from external sources (files, commands, HTTP, NATS)
89/// and return it as a JSON value that can be injected into the pipeline.
90#[async_trait::async_trait]
91pub trait SourceResolver: Send + Sync {
92    /// Resolve a single dynamic source, returning the fetched data.
93    async fn resolve(&self, source: &DynamicSource) -> Result<ResolvedValue, SourceError>;
94}
95
96/// Default source resolver that dispatches to file, command, and HTTP resolvers.
97pub struct DefaultSourceResolver {
98    cache: SourceCache,
99}
100
101impl DefaultSourceResolver {
102    /// Create a new resolver with an in-memory cache.
103    pub fn new() -> Self {
104        Self {
105            cache: SourceCache::new(),
106        }
107    }
108
109    /// Create a new resolver with the given cache.
110    pub fn with_cache(cache: SourceCache) -> Self {
111        Self { cache }
112    }
113
114    /// Get a reference to the cache (for inspection/testing).
115    pub fn cache(&self) -> &SourceCache {
116        &self.cache
117    }
118}
119
120impl Default for DefaultSourceResolver {
121    fn default() -> Self {
122        Self::new()
123    }
124}
125
126#[async_trait::async_trait]
127impl SourceResolver for DefaultSourceResolver {
128    async fn resolve(&self, source: &DynamicSource) -> Result<ResolvedValue, SourceError> {
129        let result = match &source.source_type {
130            SourceType::File {
131                path,
132                format,
133                extract,
134            } => file::resolve_file(path, *format, extract.as_ref()).await,
135            SourceType::Command {
136                command,
137                format,
138                extract,
139            } => command::resolve_command(command, *format, extract.as_ref(), source.timeout).await,
140            SourceType::Http {
141                url,
142                method,
143                headers,
144                format,
145                extract,
146            } => {
147                http::resolve_http(
148                    url,
149                    method.as_deref(),
150                    headers,
151                    *format,
152                    extract.as_ref(),
153                    source.timeout,
154                )
155                .await
156            }
157            #[cfg(feature = "nats")]
158            SourceType::Nats {
159                url,
160                subject,
161                format,
162                extract,
163            } => nats::resolve_nats_initial(url, subject, *format, extract.as_ref()).await,
164            #[cfg(not(feature = "nats"))]
165            SourceType::Nats { .. } => {
166                return Err(SourceError {
167                    source_id: source.id.clone(),
168                    kind: SourceErrorKind::Fetch("NATS source requires the 'nats' feature".into()),
169                });
170            }
171        };
172
173        match result {
174            Ok(value) => {
175                tracing::debug!(source_id = %source.id, "Source fetched successfully");
176                self.cache.store(&source.id, &value.data);
177                Ok(value)
178            }
179            Err(mut err) => {
180                err.source_id = source.id.clone();
181                match source.on_error {
182                    ErrorPolicy::UseCached => {
183                        if let Some(cached) = self.cache.get(&source.id) {
184                            tracing::warn!(
185                                source_id = %source.id,
186                                error = %err,
187                                "Source resolution failed, using cached value"
188                            );
189                            Ok(ResolvedValue {
190                                data: cached,
191                                resolved_at: Instant::now(),
192                                from_cache: true,
193                            })
194                        } else {
195                            Err(err)
196                        }
197                    }
198                    ErrorPolicy::UseDefault => {
199                        if let Some(default) = &source.default {
200                            tracing::warn!(
201                                source_id = %source.id,
202                                error = %err,
203                                "Source resolution failed, using default value"
204                            );
205                            let json_default = yaml_value_to_json(default);
206                            Ok(ResolvedValue {
207                                data: json_default,
208                                resolved_at: Instant::now(),
209                                from_cache: false,
210                            })
211                        } else {
212                            Err(err)
213                        }
214                    }
215                    ErrorPolicy::Fail => Err(err),
216                }
217            }
218        }
219    }
220}
221
222/// Resolve all sources in a pipeline, returning a map of source_id -> resolved data.
223///
224/// Applies error policies: `use_cached`, `use_default`, or `fail`.
225/// Required sources with `Fail` policy propagate errors immediately.
226/// Optional sources (required=false) that fail are logged and skipped
227/// with a Null fallback value.
228pub async fn resolve_all(
229    resolver: &dyn SourceResolver,
230    sources: &[DynamicSource],
231) -> Result<std::collections::HashMap<String, serde_json::Value>, SourceError> {
232    resolve_all_with_state(resolver, sources, None).await
233}
234
235/// Like [`resolve_all`] but also updates a [`PipelineState`] with source resolution status.
236pub async fn resolve_all_with_state(
237    resolver: &dyn SourceResolver,
238    sources: &[DynamicSource],
239    mut state: Option<&mut rsigma_eval::pipeline::state::PipelineState>,
240) -> Result<std::collections::HashMap<String, serde_json::Value>, SourceError> {
241    let mut resolved = std::collections::HashMap::new();
242    for source in sources {
243        match resolver.resolve(source).await {
244            Ok(value) => {
245                resolved.insert(source.id.clone(), value.data);
246                if let Some(s) = state.as_deref_mut() {
247                    s.mark_source_resolved(&source.id);
248                }
249            }
250            Err(e) => {
251                if let Some(s) = state.as_deref_mut() {
252                    s.mark_source_failed(&source.id);
253                }
254                if source.required {
255                    return Err(e);
256                }
257                tracing::warn!(
258                    source_id = %source.id,
259                    error = %e,
260                    "Optional source resolution failed, using null"
261                );
262                resolved.insert(source.id.clone(), serde_json::Value::Null);
263            }
264        }
265    }
266    Ok(resolved)
267}
268
269/// Convert a `yaml_serde::Value` to a `serde_json::Value`.
270pub fn yaml_value_to_json(yaml: &yaml_serde::Value) -> serde_json::Value {
271    match yaml {
272        yaml_serde::Value::Null => serde_json::Value::Null,
273        yaml_serde::Value::Bool(b) => serde_json::Value::Bool(*b),
274        yaml_serde::Value::Number(n) => {
275            if let Some(i) = n.as_i64() {
276                serde_json::Value::Number(i.into())
277            } else if let Some(u) = n.as_u64() {
278                serde_json::Value::Number(u.into())
279            } else if let Some(f) = n.as_f64() {
280                serde_json::json!(f)
281            } else {
282                serde_json::Value::Null
283            }
284        }
285        yaml_serde::Value::String(s) => serde_json::Value::String(s.clone()),
286        yaml_serde::Value::Sequence(seq) => {
287            serde_json::Value::Array(seq.iter().map(yaml_value_to_json).collect())
288        }
289        yaml_serde::Value::Mapping(map) => {
290            let obj = map
291                .iter()
292                .map(|(k, v)| {
293                    let key = match k {
294                        yaml_serde::Value::String(s) => s.clone(),
295                        other => format!("{other:?}"),
296                    };
297                    (key, yaml_value_to_json(v))
298                })
299                .collect();
300            serde_json::Value::Object(obj)
301        }
302        yaml_serde::Value::Tagged(tagged) => yaml_value_to_json(&tagged.value),
303    }
304}