1use std::collections::{HashMap, HashSet};
5use std::fmt;
6use std::path::PathBuf;
7
8use rsigma_eval::pipeline::sources::DynamicSource;
9
10#[derive(Debug, Clone, PartialEq, Eq)]
12pub enum SourceOrigin {
13 External(PathBuf),
15 Pipeline(String),
17}
18
19impl fmt::Display for SourceOrigin {
20 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
21 match self {
22 Self::External(path) => write!(f, "external:{}", path.display()),
23 Self::Pipeline(name) => write!(f, "pipeline:{name}"),
24 }
25 }
26}
27
28#[derive(Debug, Clone)]
30pub struct RegistryEntry {
31 pub source: DynamicSource,
32 pub origin: SourceOrigin,
33}
34
35#[derive(Debug, Clone)]
42pub struct DaemonSourceRegistry {
43 entries: Vec<RegistryEntry>,
44 ids: HashSet<String>,
45}
46
47#[derive(Debug, Clone)]
49pub struct SourceCollisionError {
50 pub source_id: String,
51 pub first: SourceOrigin,
52 pub second: SourceOrigin,
53}
54
55impl fmt::Display for SourceCollisionError {
56 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57 write!(
58 f,
59 "source ID '{}' declared in both {} and {}",
60 self.source_id, self.first, self.second
61 )
62 }
63}
64
65impl std::error::Error for SourceCollisionError {}
66
67impl DaemonSourceRegistry {
68 pub fn new(
73 external: Vec<(DynamicSource, PathBuf)>,
74 pipeline_sources: Vec<(DynamicSource, String)>,
75 ) -> Result<Self, SourceCollisionError> {
76 let mut seen: HashMap<String, SourceOrigin> = HashMap::new();
77 let mut entries = Vec::with_capacity(external.len() + pipeline_sources.len());
78
79 for (source, path) in external {
80 let origin = SourceOrigin::External(path);
81 if let Some(prev) = seen.get(&source.id) {
82 return Err(SourceCollisionError {
83 source_id: source.id.clone(),
84 first: prev.clone(),
85 second: origin,
86 });
87 }
88 seen.insert(source.id.clone(), origin.clone());
89 entries.push(RegistryEntry { source, origin });
90 }
91
92 for (source, pipeline_name) in pipeline_sources {
93 let origin = SourceOrigin::Pipeline(pipeline_name);
94 if let Some(prev) = seen.get(&source.id) {
95 return Err(SourceCollisionError {
96 source_id: source.id.clone(),
97 first: prev.clone(),
98 second: origin,
99 });
100 }
101 seen.insert(source.id.clone(), origin.clone());
102 entries.push(RegistryEntry { source, origin });
103 }
104
105 let ids = seen.into_keys().collect();
106 Ok(Self { entries, ids })
107 }
108
109 pub fn from_external(
111 external: Vec<(DynamicSource, PathBuf)>,
112 ) -> Result<Self, SourceCollisionError> {
113 Self::new(external, Vec::new())
114 }
115
116 pub fn empty() -> Self {
118 Self {
119 entries: Vec::new(),
120 ids: HashSet::new(),
121 }
122 }
123
124 pub fn sources(&self) -> Vec<&DynamicSource> {
126 self.entries.iter().map(|e| &e.source).collect()
127 }
128
129 pub fn into_sources(self) -> Vec<DynamicSource> {
131 self.entries.into_iter().map(|e| e.source).collect()
132 }
133
134 pub fn entries(&self) -> &[RegistryEntry] {
136 &self.entries
137 }
138
139 pub fn ids(&self) -> &HashSet<String> {
141 &self.ids
142 }
143
144 pub fn is_empty(&self) -> bool {
146 self.entries.is_empty()
147 }
148
149 pub fn len(&self) -> usize {
151 self.entries.len()
152 }
153
154 pub fn get(&self, id: &str) -> Option<&RegistryEntry> {
156 self.entries.iter().find(|e| e.source.id == id)
157 }
158}
159
160pub fn load_external_sources(
163 paths: &[PathBuf],
164) -> Result<Vec<(DynamicSource, PathBuf)>, rsigma_eval::EvalError> {
165 let mut result = Vec::new();
166 for path in paths {
167 if path.is_dir() {
168 let sources = rsigma_eval::parse_sources_dir(path)?;
169 for source in sources {
170 result.push((source, path.clone()));
171 }
172 } else {
173 let sources = rsigma_eval::parse_sources_file(path)?;
174 for source in sources {
175 result.push((source, path.clone()));
176 }
177 }
178 }
179 Ok(result)
180}
181
182#[cfg(test)]
183mod tests {
184 use super::*;
185 use rsigma_eval::pipeline::sources::{
186 DataFormat, DynamicSource, ErrorPolicy, RefreshPolicy, SourceType,
187 };
188
189 fn file_source(id: &str) -> DynamicSource {
190 DynamicSource {
191 id: id.to_string(),
192 source_type: SourceType::File {
193 path: PathBuf::from("/tmp/test.json"),
194 format: DataFormat::Json,
195 extract: None,
196 },
197 refresh: RefreshPolicy::Once,
198 timeout: None,
199 on_error: ErrorPolicy::UseCached,
200 required: true,
201 default: None,
202 }
203 }
204
205 #[test]
206 fn no_collision_different_ids() {
207 let external = vec![
208 (file_source("a"), PathBuf::from("sources.yml")),
209 (file_source("b"), PathBuf::from("sources.yml")),
210 ];
211 let pipeline = vec![(file_source("c"), "my_pipeline".to_string())];
212 let registry = DaemonSourceRegistry::new(external, pipeline).unwrap();
213 assert_eq!(registry.len(), 3);
214 assert!(registry.ids().contains("a"));
215 assert!(registry.ids().contains("b"));
216 assert!(registry.ids().contains("c"));
217 }
218
219 #[test]
220 fn collision_within_external() {
221 let external = vec![
222 (file_source("dup"), PathBuf::from("a.yml")),
223 (file_source("dup"), PathBuf::from("b.yml")),
224 ];
225 let err = DaemonSourceRegistry::new(external, Vec::new()).unwrap_err();
226 assert_eq!(err.source_id, "dup");
227 assert!(err.to_string().contains("a.yml"));
228 assert!(err.to_string().contains("b.yml"));
229 }
230
231 #[test]
232 fn collision_external_vs_pipeline() {
233 let external = vec![(file_source("shared"), PathBuf::from("ext.yml"))];
234 let pipeline = vec![(file_source("shared"), "pipe1".to_string())];
235 let err = DaemonSourceRegistry::new(external, pipeline).unwrap_err();
236 assert_eq!(err.source_id, "shared");
237 }
238
239 #[test]
240 fn empty_registry() {
241 let registry = DaemonSourceRegistry::empty();
242 assert!(registry.is_empty());
243 assert_eq!(registry.len(), 0);
244 }
245}