Skip to main content

sanitize_engine/processor/
registry.rs

1//! Processor registry — discovers and dispatches structured processors.
2//!
3//! The [`ProcessorRegistry`] holds a set of registered [`Processor`]
4//! implementations and provides methods to:
5//!
6//! 1. Look up a processor by name.
7//! 2. Auto-detect a processor for given content + profile.
8//! 3. Process content using a matching processor, falling back to `None`
9//!    if no processor matches (caller can then use the streaming scanner).
10
11use super::{FileTypeProfile, Processor};
12use crate::error::Result;
13use crate::store::MappingStore;
14use std::collections::HashMap;
15use std::sync::Arc;
16
17/// Registry of structured processors.
18///
19/// Thread-safe (processors are `Arc<dyn Processor>`) and can be shared
20/// across threads via `Arc<ProcessorRegistry>`.
21pub struct ProcessorRegistry {
22    /// Processors indexed by name.
23    processors: HashMap<String, Arc<dyn Processor>>,
24}
25
26impl ProcessorRegistry {
27    /// Create an empty registry.
28    #[must_use]
29    pub fn new() -> Self {
30        Self {
31            processors: HashMap::new(),
32        }
33    }
34
35    /// Create a registry pre-populated with all built-in processors.
36    #[must_use]
37    pub fn with_builtins() -> Self {
38        let mut reg = Self::new();
39        let kv: Arc<dyn Processor> = Arc::new(super::key_value::KeyValueProcessor);
40        reg.processors.insert("key_value".into(), Arc::clone(&kv));
41        reg.processors.insert("key-value".into(), kv);
42        reg.register(Arc::new(super::json_proc::JsonProcessor));
43        reg.register(Arc::new(super::jsonl_proc::JsonLinesProcessor));
44        reg.register(Arc::new(super::yaml_proc::YamlProcessor));
45        reg.register(Arc::new(super::xml_proc::XmlProcessor));
46        reg.register(Arc::new(super::csv_proc::CsvProcessor));
47        reg.register(Arc::new(super::toml_proc::TomlProcessor));
48        reg.register(Arc::new(super::env_proc::EnvProcessor));
49        reg.register(Arc::new(super::ini_proc::IniProcessor));
50        reg.register(Arc::new(super::log_line::LogLineProcessor::new()));
51        reg
52    }
53
54    /// Register a processor. Overwrites any existing processor with the
55    /// same name.
56    pub fn register(&mut self, processor: Arc<dyn Processor>) {
57        self.processors
58            .insert(processor.name().to_string(), processor);
59    }
60
61    /// Look up a processor by its name.
62    pub fn get(&self, name: &str) -> Option<&Arc<dyn Processor>> {
63        self.processors.get(name)
64    }
65
66    /// List all registered processor names.
67    pub fn names(&self) -> Vec<&str> {
68        self.processors.keys().map(|s| s.as_str()).collect()
69    }
70
71    /// Number of registered processors.
72    #[must_use]
73    pub fn len(&self) -> usize {
74        self.processors.len()
75    }
76
77    /// Whether the registry is empty.
78    #[must_use]
79    pub fn is_empty(&self) -> bool {
80        self.processors.is_empty()
81    }
82
83    /// Find a processor that can handle the given content + profile.
84    ///
85    /// 1. If the profile names a specific processor, look it up directly.
86    /// 2. Otherwise, iterate all processors and return the first whose
87    ///    `can_handle` returns `true`.
88    ///
89    /// Returns `None` if no processor matches (caller should fall back
90    /// to the streaming scanner).
91    pub fn find_processor(
92        &self,
93        content: &[u8],
94        profile: &FileTypeProfile,
95    ) -> Option<&Arc<dyn Processor>> {
96        // Direct lookup by profile's processor name.
97        if let Some(proc) = self.processors.get(&profile.processor) {
98            if proc.can_handle(content, profile) {
99                return Some(proc);
100            }
101        }
102
103        // Auto-detect: first matching processor.
104        self.processors
105            .values()
106            .find(|proc| proc.can_handle(content, profile))
107    }
108
109    /// Process content using the matching processor.
110    ///
111    /// Returns `Ok(Some(output))` if a processor matched and succeeded,
112    /// `Ok(None)` if no processor matches (caller should fall back),
113    /// or `Err(...)` if processing failed.
114    ///
115    /// # Errors
116    ///
117    /// Returns the underlying processor's error if processing fails.
118    pub fn process(
119        &self,
120        content: &[u8],
121        profile: &FileTypeProfile,
122        store: &MappingStore,
123    ) -> Result<Option<Vec<u8>>> {
124        match self.find_processor(content, profile) {
125            Some(proc) => {
126                let output = proc.process(content, profile, store)?;
127                Ok(Some(output))
128            }
129            None => Ok(None),
130        }
131    }
132}
133
134impl Default for ProcessorRegistry {
135    fn default() -> Self {
136        Self::with_builtins()
137    }
138}
139
140#[cfg(test)]
141mod tests {
142    use super::*;
143    use crate::category::Category;
144    use crate::generator::HmacGenerator;
145    use crate::processor::profile::{FieldRule, FileTypeProfile};
146    use std::sync::Arc;
147
148    fn make_store() -> MappingStore {
149        let gen = Arc::new(HmacGenerator::new([42u8; 32]));
150        MappingStore::new(gen, None)
151    }
152
153    #[test]
154    fn new_registry_is_empty() {
155        let reg = ProcessorRegistry::new();
156        assert!(reg.is_empty());
157        assert_eq!(reg.len(), 0);
158    }
159
160    #[test]
161    fn with_builtins_registers_known_processors() {
162        let reg = ProcessorRegistry::with_builtins();
163        assert!(!reg.is_empty());
164        let names = reg.names();
165        for expected in &["json", "yaml", "xml", "csv", "toml", "jsonl"] {
166            assert!(names.contains(expected), "missing processor: {expected}");
167        }
168    }
169
170    #[test]
171    fn register_and_get_roundtrip() {
172        let mut reg = ProcessorRegistry::new();
173        reg.register(Arc::new(crate::processor::json_proc::JsonProcessor));
174        assert!(reg.get("json").is_some());
175        assert!(reg.get("xml").is_none());
176    }
177
178    #[test]
179    fn register_overwrites_existing() {
180        let mut reg = ProcessorRegistry::new();
181        reg.register(Arc::new(crate::processor::json_proc::JsonProcessor));
182        reg.register(Arc::new(crate::processor::json_proc::JsonProcessor));
183        assert_eq!(reg.len(), 1);
184    }
185
186    #[test]
187    fn names_lists_all_registered() {
188        let mut reg = ProcessorRegistry::new();
189        reg.register(Arc::new(crate::processor::json_proc::JsonProcessor));
190        reg.register(Arc::new(crate::processor::yaml_proc::YamlProcessor));
191        let names = reg.names();
192        assert_eq!(names.len(), 2);
193        assert!(names.contains(&"json"));
194        assert!(names.contains(&"yaml"));
195    }
196
197    #[test]
198    fn find_processor_by_profile_name() {
199        let reg = ProcessorRegistry::with_builtins();
200        let profile = FileTypeProfile::new("json", vec![]).with_extension(".json");
201        let content = b"{}";
202        assert!(reg.find_processor(content, &profile).is_some());
203    }
204
205    #[test]
206    fn find_processor_returns_none_for_unrecognised_content() {
207        let reg = ProcessorRegistry::new(); // empty — nothing registered
208        let profile = FileTypeProfile::new("json", vec![]).with_extension(".json");
209        assert!(reg.find_processor(b"{}", &profile).is_none());
210    }
211
212    #[test]
213    fn process_returns_some_for_matching_content() {
214        let reg = ProcessorRegistry::with_builtins();
215        let store = make_store();
216        let profile = FileTypeProfile::new(
217            "json",
218            vec![FieldRule::new("*.secret").with_category(Category::Custom("s".into()))],
219        )
220        .with_extension(".json");
221        let result = reg
222            .process(br#"{"secret":"abc"}"#, &profile, &store)
223            .unwrap();
224        assert!(result.is_some());
225    }
226
227    #[test]
228    fn process_returns_none_when_no_processor_matches() {
229        let reg = ProcessorRegistry::new(); // empty
230        let store = make_store();
231        let profile = FileTypeProfile::new("json", vec![]).with_extension(".json");
232        let result = reg.process(b"{}", &profile, &store).unwrap();
233        assert!(result.is_none());
234    }
235
236    #[test]
237    fn default_impl_gives_builtins() {
238        let reg = ProcessorRegistry::default();
239        assert!(reg.get("json").is_some());
240    }
241}