Skip to main content

rsigma_runtime/sources/
mod.rs

1//! Dynamic source resolution for Sigma pipelines.
2//!
3//! This module provides the [`SourceResolver`] trait and a [`DefaultSourceResolver`]
4//! implementation that fetches data from file, command, HTTP, and NATS sources
5//! declared in a pipeline's `sources` section.
6
7pub mod cache;
8pub mod command;
9pub mod extract;
10pub mod file;
11pub mod http;
12pub mod include;
13#[cfg(feature = "nats")]
14pub mod nats;
15pub mod refresh;
16pub mod registry;
17pub mod template;
18
19use std::time::{Duration, Instant};
20
21use rsigma_eval::pipeline::sources::{DynamicSource, ErrorPolicy, SourceType};
22
23/// Maximum size of a source response body (HTTP, command stdout, NATS payload).
24pub const MAX_SOURCE_RESPONSE_BYTES: usize = 10 * 1024 * 1024; // 10 MB
25
26/// Minimum allowed refresh interval to prevent hot CPU loops.
27pub const MIN_REFRESH_INTERVAL: Duration = Duration::from_secs(1);
28
29pub use cache::SourceCache;
30pub use template::TemplateExpander;
31
32/// The result of successfully resolving a dynamic source.
33#[derive(Debug, Clone)]
34pub struct ResolvedValue {
35    /// The resolved data as a YAML value (can be scalar, sequence, or mapping).
36    pub data: serde_json::Value,
37    /// When this value was resolved.
38    pub resolved_at: Instant,
39    /// Whether this value was served from cache rather than freshly fetched.
40    pub from_cache: bool,
41}
42
43/// An error that occurred while resolving a dynamic source.
44#[derive(Debug, Clone)]
45pub struct SourceError {
46    /// The source ID that failed.
47    pub source_id: String,
48    /// What went wrong.
49    pub kind: SourceErrorKind,
50}
51
52impl std::fmt::Display for SourceError {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        write!(f, "source '{}': {}", self.source_id, self.kind)
55    }
56}
57
58impl std::error::Error for SourceError {}
59
60/// The kind of error that occurred during source resolution.
61#[derive(Debug, Clone)]
62pub enum SourceErrorKind {
63    /// Failed to fetch/read the source data.
64    Fetch(String),
65    /// Failed to parse the fetched data into the expected format.
66    Parse(String),
67    /// The `extract` expression failed or returned no data.
68    Extract(String),
69    /// The fetch exceeded the configured timeout.
70    Timeout,
71    /// The response exceeded the maximum allowed size.
72    ResourceLimit(String),
73}
74
75impl std::fmt::Display for SourceErrorKind {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        match self {
78            Self::Fetch(msg) => write!(f, "fetch failed: {msg}"),
79            Self::Parse(msg) => write!(f, "parse failed: {msg}"),
80            Self::Extract(msg) => write!(f, "extract failed: {msg}"),
81            Self::Timeout => write!(f, "timed out"),
82            Self::ResourceLimit(msg) => write!(f, "resource limit exceeded: {msg}"),
83        }
84    }
85}
86
87/// Trait for resolving dynamic pipeline sources.
88///
89/// Implementations fetch data from external sources (files, commands, HTTP, NATS)
90/// and return it as a JSON value that can be injected into the pipeline.
91#[async_trait::async_trait]
92pub trait SourceResolver: Send + Sync {
93    /// Resolve a single dynamic source, returning the fetched data.
94    async fn resolve(&self, source: &DynamicSource) -> Result<ResolvedValue, SourceError>;
95}
96
97/// Default source resolver that dispatches to file, command, and HTTP resolvers.
98pub struct DefaultSourceResolver {
99    cache: std::sync::Arc<SourceCache>,
100}
101
102impl DefaultSourceResolver {
103    /// Create a new resolver with an in-memory cache.
104    pub fn new() -> Self {
105        Self {
106            cache: std::sync::Arc::new(SourceCache::new()),
107        }
108    }
109
110    /// Create a new resolver with the given cache.
111    pub fn with_cache(cache: SourceCache) -> Self {
112        Self {
113            cache: std::sync::Arc::new(cache),
114        }
115    }
116
117    /// Create a new resolver that shares an existing `Arc<SourceCache>`
118    /// with another consumer (e.g. the daemon's enrichment pipeline,
119    /// which reads from the same cache for `lookup` enrichers).
120    pub fn with_arc_cache(cache: std::sync::Arc<SourceCache>) -> Self {
121        Self { cache }
122    }
123
124    /// Get a reference to the cache (for inspection/testing).
125    pub fn cache(&self) -> &SourceCache {
126        &self.cache
127    }
128
129    /// Borrow the shared `Arc<SourceCache>` so other components (e.g.
130    /// the daemon's enrichment pipeline) can read from the same cache.
131    pub fn arc_cache(&self) -> std::sync::Arc<SourceCache> {
132        self.cache.clone()
133    }
134}
135
136impl Default for DefaultSourceResolver {
137    fn default() -> Self {
138        Self::new()
139    }
140}
141
142#[async_trait::async_trait]
143impl SourceResolver for DefaultSourceResolver {
144    async fn resolve(&self, source: &DynamicSource) -> Result<ResolvedValue, SourceError> {
145        let result = match &source.source_type {
146            SourceType::File {
147                path,
148                format,
149                extract,
150            } => file::resolve_file(path, *format, extract.as_ref()).await,
151            SourceType::Command {
152                command,
153                format,
154                extract,
155            } => command::resolve_command(command, *format, extract.as_ref(), source.timeout).await,
156            SourceType::Http {
157                url,
158                method,
159                headers,
160                format,
161                extract,
162            } => {
163                http::resolve_http(
164                    url,
165                    method.as_deref(),
166                    headers,
167                    *format,
168                    extract.as_ref(),
169                    source.timeout,
170                )
171                .await
172            }
173            #[cfg(feature = "nats")]
174            SourceType::Nats {
175                url,
176                subject,
177                format,
178                extract,
179            } => nats::resolve_nats_initial(url, subject, *format, extract.as_ref()).await,
180            #[cfg(not(feature = "nats"))]
181            SourceType::Nats { .. } => {
182                return Err(SourceError {
183                    source_id: source.id.clone(),
184                    kind: SourceErrorKind::Fetch("NATS source requires the 'nats' feature".into()),
185                });
186            }
187        };
188
189        match result {
190            Ok(value) => {
191                tracing::debug!(source_id = %source.id, "Source fetched successfully");
192                self.cache.store(&source.id, &value.data);
193                Ok(value)
194            }
195            Err(mut err) => {
196                err.source_id = source.id.clone();
197                match source.on_error {
198                    ErrorPolicy::UseCached => {
199                        if let Some(cached) = self.cache.get(&source.id) {
200                            tracing::warn!(
201                                source_id = %source.id,
202                                error = %err,
203                                "Source resolution failed, using cached value"
204                            );
205                            Ok(ResolvedValue {
206                                data: cached,
207                                resolved_at: Instant::now(),
208                                from_cache: true,
209                            })
210                        } else {
211                            Err(err)
212                        }
213                    }
214                    ErrorPolicy::UseDefault => {
215                        if let Some(default) = &source.default {
216                            tracing::warn!(
217                                source_id = %source.id,
218                                error = %err,
219                                "Source resolution failed, using default value"
220                            );
221                            let json_default = yaml_value_to_json(default);
222                            Ok(ResolvedValue {
223                                data: json_default,
224                                resolved_at: Instant::now(),
225                                from_cache: false,
226                            })
227                        } else {
228                            Err(err)
229                        }
230                    }
231                    ErrorPolicy::Fail => Err(err),
232                }
233            }
234        }
235    }
236}
237
238/// Resolve all sources in a pipeline, returning a map of source_id -> resolved data.
239///
240/// Applies error policies: `use_cached`, `use_default`, or `fail`.
241/// Required sources with `Fail` policy propagate errors immediately.
242/// Optional sources (required=false) that fail are logged and skipped
243/// with a Null fallback value.
244pub async fn resolve_all(
245    resolver: &dyn SourceResolver,
246    sources: &[DynamicSource],
247) -> Result<std::collections::HashMap<String, serde_json::Value>, SourceError> {
248    resolve_all_with_state(resolver, sources, None).await
249}
250
251/// Like [`resolve_all`] but also updates a
252/// [`PipelineState`](rsigma_eval::pipeline::state::PipelineState) with
253/// source resolution status.
254pub async fn resolve_all_with_state(
255    resolver: &dyn SourceResolver,
256    sources: &[DynamicSource],
257    mut state: Option<&mut rsigma_eval::pipeline::state::PipelineState>,
258) -> Result<std::collections::HashMap<String, serde_json::Value>, SourceError> {
259    let mut resolved = std::collections::HashMap::new();
260    for source in sources {
261        match resolver.resolve(source).await {
262            Ok(value) => {
263                resolved.insert(source.id.clone(), value.data);
264                if let Some(s) = state.as_deref_mut() {
265                    s.mark_source_resolved(&source.id);
266                }
267            }
268            Err(e) => {
269                if let Some(s) = state.as_deref_mut() {
270                    s.mark_source_failed(&source.id);
271                }
272                if source.required {
273                    return Err(e);
274                }
275                tracing::warn!(
276                    source_id = %source.id,
277                    error = %e,
278                    "Optional source resolution failed, using null"
279                );
280                resolved.insert(source.id.clone(), serde_json::Value::Null);
281            }
282        }
283    }
284    Ok(resolved)
285}
286
287/// Convert a `yaml_serde::Value` to a `serde_json::Value`.
288pub fn yaml_value_to_json(yaml: &yaml_serde::Value) -> serde_json::Value {
289    match yaml {
290        yaml_serde::Value::Null => serde_json::Value::Null,
291        yaml_serde::Value::Bool(b) => serde_json::Value::Bool(*b),
292        yaml_serde::Value::Number(n) => {
293            if let Some(i) = n.as_i64() {
294                serde_json::Value::Number(i.into())
295            } else if let Some(u) = n.as_u64() {
296                serde_json::Value::Number(u.into())
297            } else if let Some(f) = n.as_f64() {
298                serde_json::json!(f)
299            } else {
300                serde_json::Value::Null
301            }
302        }
303        yaml_serde::Value::String(s) => serde_json::Value::String(s.clone()),
304        yaml_serde::Value::Sequence(seq) => {
305            serde_json::Value::Array(seq.iter().map(yaml_value_to_json).collect())
306        }
307        yaml_serde::Value::Mapping(map) => {
308            let obj = map
309                .iter()
310                .map(|(k, v)| {
311                    let key = match k {
312                        yaml_serde::Value::String(s) => s.clone(),
313                        other => format!("{other:?}"),
314                    };
315                    (key, yaml_value_to_json(v))
316                })
317                .collect();
318            serde_json::Value::Object(obj)
319        }
320        yaml_serde::Value::Tagged(tagged) => yaml_value_to_json(&tagged.value),
321    }
322}