Skip to main content

sanitize_engine/processor/
registry.rs

1//! Processor registry — discovers and dispatches structured processors.
2//!
3//! The [`ProcessorRegistry`] holds a set of registered [`Processor`]
4//! implementations and provides methods to:
5//!
6//! 1. Look up a processor by name.
7//! 2. Auto-detect a processor for given content + profile.
8//! 3. Process content using a matching processor, falling back to `None`
9//!    if no processor matches (caller can then use the streaming scanner).
10
11use super::{FileTypeProfile, Processor};
12use crate::error::Result;
13use crate::store::MappingStore;
14use std::collections::HashMap;
15use std::sync::Arc;
16
17/// Registry of structured processors.
18///
19/// Thread-safe (processors are `Arc<dyn Processor>`) and can be shared
20/// across threads via `Arc<ProcessorRegistry>`.
21pub struct ProcessorRegistry {
22    /// Processors indexed by name.
23    processors: HashMap<String, Arc<dyn Processor>>,
24}
25
26impl ProcessorRegistry {
27    /// Create an empty registry.
28    #[must_use]
29    pub fn new() -> Self {
30        Self {
31            processors: HashMap::new(),
32        }
33    }
34
35    /// Create a registry pre-populated with all built-in processors.
36    #[must_use]
37    pub fn with_builtins() -> Self {
38        let mut reg = Self::new();
39        let kv: Arc<dyn Processor> = Arc::new(super::key_value::KeyValueProcessor);
40        reg.processors.insert("key_value".into(), Arc::clone(&kv));
41        reg.processors.insert("key-value".into(), kv);
42        reg.register(Arc::new(super::json_proc::JsonProcessor));
43        reg.register(Arc::new(super::jsonl_proc::JsonLinesProcessor));
44        reg.register(Arc::new(super::yaml_proc::YamlProcessor));
45        reg.register(Arc::new(super::xml_proc::XmlProcessor));
46        reg.register(Arc::new(super::csv_proc::CsvProcessor));
47        reg.register(Arc::new(super::toml_proc::TomlProcessor));
48        reg.register(Arc::new(super::env_proc::EnvProcessor));
49        reg.register(Arc::new(super::ini_proc::IniProcessor));
50        reg.register(Arc::new(super::log_line::LogLineProcessor::new()));
51        reg
52    }
53
54    /// Register a processor. Overwrites any existing processor with the
55    /// same name.
56    pub fn register(&mut self, processor: Arc<dyn Processor>) {
57        self.processors
58            .insert(processor.name().to_string(), processor);
59    }
60
61    /// Look up a processor by its name.
62    pub fn get(&self, name: &str) -> Option<&Arc<dyn Processor>> {
63        self.processors.get(name)
64    }
65
66    /// List all registered processor names.
67    pub fn names(&self) -> Vec<&str> {
68        self.processors.keys().map(|s| s.as_str()).collect()
69    }
70
71    /// Number of registered processors.
72    #[must_use]
73    pub fn len(&self) -> usize {
74        self.processors.len()
75    }
76
77    /// Whether the registry is empty.
78    #[must_use]
79    pub fn is_empty(&self) -> bool {
80        self.processors.is_empty()
81    }
82
83    /// Find a processor that can handle the given content + profile.
84    ///
85    /// 1. If the profile names a specific processor, look it up directly.
86    /// 2. Otherwise, iterate all processors and return the first whose
87    ///    `can_handle` returns `true`.
88    ///
89    /// Returns `None` if no processor matches (caller should fall back
90    /// to the streaming scanner).
91    pub fn find_processor(
92        &self,
93        content: &[u8],
94        profile: &FileTypeProfile,
95    ) -> Option<&Arc<dyn Processor>> {
96        // Direct lookup by profile's processor name.
97        if let Some(proc) = self.processors.get(&profile.processor) {
98            if proc.can_handle(content, profile) {
99                return Some(proc);
100            }
101        }
102
103        // Auto-detect: first matching processor.
104        self.processors
105            .values()
106            .find(|proc| proc.can_handle(content, profile))
107    }
108
109    /// Process content using the matching processor.
110    ///
111    /// Returns `Ok(Some(output))` if a processor matched and succeeded,
112    /// `Ok(None)` if no processor matches (caller should fall back),
113    /// or `Err(...)` if processing failed.
114    ///
115    /// # Errors
116    ///
117    /// Returns the underlying processor's error if processing fails.
118    pub fn process(
119        &self,
120        content: &[u8],
121        profile: &FileTypeProfile,
122        store: &MappingStore,
123    ) -> Result<Option<Vec<u8>>> {
124        match self.find_processor(content, profile) {
125            Some(proc) => {
126                let output = proc.process(content, profile, store)?;
127                Ok(Some(output))
128            }
129            None => Ok(None),
130        }
131    }
132}
133
134impl Default for ProcessorRegistry {
135    fn default() -> Self {
136        Self::with_builtins()
137    }
138}