Skip to main content

rsigma_runtime/sources/
registry.rs

1//! Daemon-wide source registry that merges external (`--source`) and
2//! pipeline-declared dynamic sources with collision-error semantics.
3
4use std::collections::{HashMap, HashSet};
5use std::fmt;
6use std::path::PathBuf;
7
8use rsigma_eval::pipeline::sources::DynamicSource;
9
10/// Origin of a source declaration for diagnostics and API responses.
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub enum SourceOrigin {
13    /// Declared in a standalone sources file loaded via `--source`.
14    External(PathBuf),
15    /// Declared inline in a pipeline file's `sources:` block.
16    Pipeline(String),
17}
18
19impl fmt::Display for SourceOrigin {
20    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
21        match self {
22            Self::External(path) => write!(f, "external:{}", path.display()),
23            Self::Pipeline(name) => write!(f, "pipeline:{name}"),
24        }
25    }
26}
27
28/// A single entry in the registry: the source plus its declaration origin.
29#[derive(Debug, Clone)]
30pub struct RegistryEntry {
31    pub source: DynamicSource,
32    pub origin: SourceOrigin,
33}
34
35/// Daemon-scoped registry of all dynamic sources across both external
36/// `--source` files and pipeline-embedded `sources:` blocks.
37///
38/// Construction enforces collision-error semantics: a source ID declared
39/// in two different sites (or twice in external files) is a hard startup
40/// error with the offending file paths quoted in the message.
41#[derive(Debug, Clone)]
42pub struct DaemonSourceRegistry {
43    entries: Vec<RegistryEntry>,
44    ids: HashSet<String>,
45}
46
47/// Error returned when two source declarations use the same ID.
48#[derive(Debug, Clone)]
49pub struct SourceCollisionError {
50    pub source_id: String,
51    pub first: SourceOrigin,
52    pub second: SourceOrigin,
53}
54
55impl fmt::Display for SourceCollisionError {
56    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57        write!(
58            f,
59            "source ID '{}' declared in both {} and {}",
60            self.source_id, self.first, self.second
61        )
62    }
63}
64
65impl std::error::Error for SourceCollisionError {}
66
67impl DaemonSourceRegistry {
68    /// Build a registry from external sources and pipeline-declared sources.
69    ///
70    /// Returns `Err` if any source ID appears more than once across all
71    /// declaration sites.
72    pub fn new(
73        external: Vec<(DynamicSource, PathBuf)>,
74        pipeline_sources: Vec<(DynamicSource, String)>,
75    ) -> Result<Self, SourceCollisionError> {
76        let mut seen: HashMap<String, SourceOrigin> = HashMap::new();
77        let mut entries = Vec::with_capacity(external.len() + pipeline_sources.len());
78
79        for (source, path) in external {
80            let origin = SourceOrigin::External(path);
81            if let Some(prev) = seen.get(&source.id) {
82                return Err(SourceCollisionError {
83                    source_id: source.id.clone(),
84                    first: prev.clone(),
85                    second: origin,
86                });
87            }
88            seen.insert(source.id.clone(), origin.clone());
89            entries.push(RegistryEntry { source, origin });
90        }
91
92        for (source, pipeline_name) in pipeline_sources {
93            let origin = SourceOrigin::Pipeline(pipeline_name);
94            if let Some(prev) = seen.get(&source.id) {
95                return Err(SourceCollisionError {
96                    source_id: source.id.clone(),
97                    first: prev.clone(),
98                    second: origin,
99                });
100            }
101            seen.insert(source.id.clone(), origin.clone());
102            entries.push(RegistryEntry { source, origin });
103        }
104
105        let ids = seen.into_keys().collect();
106        Ok(Self { entries, ids })
107    }
108
109    /// Build a registry from only external sources (no pipeline sources).
110    pub fn from_external(
111        external: Vec<(DynamicSource, PathBuf)>,
112    ) -> Result<Self, SourceCollisionError> {
113        Self::new(external, Vec::new())
114    }
115
116    /// Build an empty registry (no sources at all).
117    pub fn empty() -> Self {
118        Self {
119            entries: Vec::new(),
120            ids: HashSet::new(),
121        }
122    }
123
124    /// All sources in the registry.
125    pub fn sources(&self) -> Vec<&DynamicSource> {
126        self.entries.iter().map(|e| &e.source).collect()
127    }
128
129    /// All owned sources in the registry.
130    pub fn into_sources(self) -> Vec<DynamicSource> {
131        self.entries.into_iter().map(|e| e.source).collect()
132    }
133
134    /// All entries (source + origin) in the registry.
135    pub fn entries(&self) -> &[RegistryEntry] {
136        &self.entries
137    }
138
139    /// The set of all declared source IDs.
140    pub fn ids(&self) -> &HashSet<String> {
141        &self.ids
142    }
143
144    /// Whether the registry has any sources.
145    pub fn is_empty(&self) -> bool {
146        self.entries.is_empty()
147    }
148
149    /// Number of sources in the registry.
150    pub fn len(&self) -> usize {
151        self.entries.len()
152    }
153
154    /// Look up a source by ID.
155    pub fn get(&self, id: &str) -> Option<&RegistryEntry> {
156        self.entries.iter().find(|e| e.source.id == id)
157    }
158}
159
160/// Load external sources from `--source` paths. Each path is either a
161/// single YAML file or a directory of `*.yml`/`*.yaml` files.
162pub fn load_external_sources(
163    paths: &[PathBuf],
164) -> Result<Vec<(DynamicSource, PathBuf)>, rsigma_eval::EvalError> {
165    let mut result = Vec::new();
166    for path in paths {
167        if path.is_dir() {
168            let sources = rsigma_eval::parse_sources_dir(path)?;
169            for source in sources {
170                result.push((source, path.clone()));
171            }
172        } else {
173            let sources = rsigma_eval::parse_sources_file(path)?;
174            for source in sources {
175                result.push((source, path.clone()));
176            }
177        }
178    }
179    Ok(result)
180}
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185    use rsigma_eval::pipeline::sources::{
186        DataFormat, DynamicSource, ErrorPolicy, RefreshPolicy, SourceType,
187    };
188
189    fn file_source(id: &str) -> DynamicSource {
190        DynamicSource {
191            id: id.to_string(),
192            source_type: SourceType::File {
193                path: PathBuf::from("/tmp/test.json"),
194                format: DataFormat::Json,
195                extract: None,
196            },
197            refresh: RefreshPolicy::Once,
198            timeout: None,
199            on_error: ErrorPolicy::UseCached,
200            required: true,
201            default: None,
202        }
203    }
204
205    #[test]
206    fn no_collision_different_ids() {
207        let external = vec![
208            (file_source("a"), PathBuf::from("sources.yml")),
209            (file_source("b"), PathBuf::from("sources.yml")),
210        ];
211        let pipeline = vec![(file_source("c"), "my_pipeline".to_string())];
212        let registry = DaemonSourceRegistry::new(external, pipeline).unwrap();
213        assert_eq!(registry.len(), 3);
214        assert!(registry.ids().contains("a"));
215        assert!(registry.ids().contains("b"));
216        assert!(registry.ids().contains("c"));
217    }
218
219    #[test]
220    fn collision_within_external() {
221        let external = vec![
222            (file_source("dup"), PathBuf::from("a.yml")),
223            (file_source("dup"), PathBuf::from("b.yml")),
224        ];
225        let err = DaemonSourceRegistry::new(external, Vec::new()).unwrap_err();
226        assert_eq!(err.source_id, "dup");
227        assert!(err.to_string().contains("a.yml"));
228        assert!(err.to_string().contains("b.yml"));
229    }
230
231    #[test]
232    fn collision_external_vs_pipeline() {
233        let external = vec![(file_source("shared"), PathBuf::from("ext.yml"))];
234        let pipeline = vec![(file_source("shared"), "pipe1".to_string())];
235        let err = DaemonSourceRegistry::new(external, pipeline).unwrap_err();
236        assert_eq!(err.source_id, "shared");
237    }
238
239    #[test]
240    fn empty_registry() {
241        let registry = DaemonSourceRegistry::empty();
242        assert!(registry.is_empty());
243        assert_eq!(registry.len(), 0);
244    }
245}