Skip to main content

anomalyx_normalize/
parser.rs

1//! The format-parser plugin contract.
2//!
3//! Each file format is an independent [`FormatParser`] (one per file under
4//! `parsers/`). A [`ParserRegistry`] resolves a byte stream to a parser — by
5//! file extension first, then by content sniff — and asks it to produce
6//! columns. Adding a format is: write a `parsers/<fmt>.rs`, implement the trait,
7//! and register it in [`parsers::default_registry`]. No central `match` to edit.
8//!
9//! This mirrors the `Detector`/`Registry` pattern in `ax-detect`: explicit
10//! registration (so formats are feature-gateable and the set is deterministic),
11//! not runtime dynamic loading.
12
13use ax_core::{AxError, Column, RecordSet};
14
15/// Content-sniff confidence. Higher wins; ties break by registration order, so
16/// resolution is deterministic. Use the named constants rather than bare ints.
17pub type Confidence = u16;
18
19/// A binary magic number matched (e.g. Parquet `PAR1`). Unambiguous.
20pub const MAGIC: Confidence = 100;
21/// A distinctive text shape that should win over the generic fallback
22/// (e.g. NDJSON's repeated object-per-line).
23pub const STRONG: Confidence = 60;
24/// A recognizable text shape (single JSON document, tab-delimited).
25pub const TEXT: Confidence = 50;
26/// Last-resort claim — CSV treats any leftover text as comma-delimited.
27pub const FALLBACK: Confidence = 1;
28
29/// One file format. Implementors live in `parsers/` — one per format.
30pub trait FormatParser: Send + Sync {
31    /// Stable identifier, recorded in the envelope's `format` field (e.g. `"csv"`).
32    fn id(&self) -> &'static str;
33
34    /// File extensions this parser claims (lower-case, no dot).
35    fn extensions(&self) -> &'static [&'static str];
36
37    /// How strongly `bytes` looks like this format, or `None` if it clearly is
38    /// not. Used only when the extension doesn't decide.
39    fn sniff(&self, bytes: &[u8]) -> Option<Confidence>;
40
41    /// Parse `bytes` (from logical `source`) into columns.
42    fn parse(&self, source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError>;
43}
44
45/// An ordered set of format parsers.
46pub struct ParserRegistry {
47    parsers: Vec<Box<dyn FormatParser>>,
48}
49
50impl ParserRegistry {
51    pub fn new() -> Self {
52        ParserRegistry {
53            parsers: Vec::new(),
54        }
55    }
56
57    pub fn register(&mut self, parser: Box<dyn FormatParser>) -> &mut Self {
58        self.parsers.push(parser);
59        self
60    }
61
62    /// Registered parser ids, in order (handy for `describe`/tests).
63    pub fn ids(&self) -> Vec<&'static str> {
64        self.parsers.iter().map(|p| p.id()).collect()
65    }
66
67    /// The lower-cased final extension of `source`, if any.
68    fn extension(source: &str) -> Option<String> {
69        source.rsplit('.').next().map(|e| e.to_ascii_lowercase())
70    }
71
72    /// Resolves the parser for `source`/`bytes`: a matching file extension wins;
73    /// otherwise the highest-confidence content sniff (first registered on a
74    /// tie). An unrecognized stream is [`AxError::UnknownFormat`], never a guess.
75    pub fn resolve(&self, source: &str, bytes: &[u8]) -> Result<&dyn FormatParser, AxError> {
76        if let Some(ext) = Self::extension(source) {
77            if let Some(p) = self
78                .parsers
79                .iter()
80                .find(|p| p.extensions().contains(&ext.as_str()))
81            {
82                return Ok(p.as_ref());
83            }
84        }
85        // Highest sniff confidence; strict `>` keeps the first registered winner.
86        let mut best: Option<(Confidence, &dyn FormatParser)> = None;
87        for p in &self.parsers {
88            if let Some(c) = p.sniff(bytes) {
89                if best.is_none_or(|(bc, _)| c > bc) {
90                    best = Some((c, p.as_ref()));
91                }
92            }
93        }
94        best.map(|(_, p)| p)
95            .ok_or_else(|| AxError::UnknownFormat(source.to_string()))
96    }
97
98    /// Resolve, parse, and wrap into a [`RecordSet`] tagged with the parser id.
99    pub fn normalize(&self, source: &str, bytes: &[u8]) -> Result<RecordSet, AxError> {
100        let parser = self.resolve(source, bytes)?;
101        let columns = parser.parse(source, bytes)?;
102        Ok(RecordSet::new(source, parser.id(), columns))
103    }
104
105    /// Normalize with an explicitly chosen parser id (skips detection).
106    pub fn normalize_with(
107        &self,
108        id: &str,
109        source: &str,
110        bytes: &[u8],
111    ) -> Result<RecordSet, AxError> {
112        let parser = self
113            .parsers
114            .iter()
115            .find(|p| p.id() == id)
116            .ok_or_else(|| AxError::Config(format!("unknown format id '{id}'")))?;
117        Ok(RecordSet::new(
118            source,
119            parser.id(),
120            parser.parse(source, bytes)?,
121        ))
122    }
123}
124
125impl Default for ParserRegistry {
126    fn default() -> Self {
127        crate::parsers::default_registry()
128    }
129}
130
131#[cfg(test)]
132mod tests {
133    use super::*;
134
135    fn reg() -> ParserRegistry {
136        ParserRegistry::default()
137    }
138
139    #[test]
140    fn extension_wins_over_content() {
141        // .csv extension is honored even though the bytes look like JSON.
142        let r = reg();
143        let p = r.resolve("data.csv", b"{\"a\":1}").unwrap();
144        assert_eq!(p.id(), "csv");
145    }
146
147    #[test]
148    fn sniff_used_without_a_known_extension() {
149        assert_eq!(reg().resolve("-", b"a,b\n1,2").unwrap().id(), "csv");
150        assert_eq!(reg().resolve("-", b"a\tb\n1\t2").unwrap().id(), "tsv");
151        assert_eq!(reg().resolve("-", b"[{\"a\":1}]").unwrap().id(), "json");
152        assert_eq!(
153            reg().resolve("-", b"{\"a\":1}\n{\"a\":2}\n").unwrap().id(),
154            "ndjson"
155        );
156    }
157
158    #[cfg(feature = "polars")]
159    #[test]
160    fn binary_magic_outranks_text_sniff() {
161        assert_eq!(
162            reg().resolve("-", b"PAR1\x00\x01x").unwrap().id(),
163            "parquet"
164        );
165        assert_eq!(
166            reg().resolve("-", b"ARROW1\x00\x00x").unwrap().id(),
167            "arrow"
168        );
169    }
170
171    #[test]
172    fn csv_mentioning_par1_is_still_csv() {
173        // a CSV that merely mentions PAR1 in its data is not Parquet
174        assert_eq!(reg().resolve("-", b"a,b\nPAR1,2").unwrap().id(), "csv");
175    }
176
177    #[test]
178    fn unrecognized_stream_errors() {
179        assert!(matches!(
180            reg().resolve("-", &[0x00, 0x01, 0x02, 0xff]),
181            Err(AxError::UnknownFormat(_))
182        ));
183    }
184
185    #[test]
186    fn extension_overrides_content_sniff() {
187        // The file extension forces a parser even when the bytes would sniff as
188        // something else — which pins each parser's `extensions()`.
189        let r = reg();
190        assert_eq!(r.resolve("x.tsv", b"a,b\n1,2").unwrap().id(), "tsv");
191        assert_eq!(r.resolve("x.tab", b"a,b\n1,2").unwrap().id(), "tsv");
192        assert_eq!(r.resolve("x.json", b"a,b").unwrap().id(), "json");
193        assert_eq!(r.resolve("x.jsonl", b"a,b").unwrap().id(), "ndjson");
194        assert_eq!(r.resolve("x.csv", b"a\tb").unwrap().id(), "csv");
195    }
196
197    #[cfg(feature = "polars")]
198    #[test]
199    fn binary_extensions_resolve() {
200        let r = reg();
201        assert_eq!(r.resolve("x.parquet", b"zz").unwrap().id(), "parquet");
202        assert_eq!(r.resolve("x.pq", b"zz").unwrap().id(), "parquet");
203        assert_eq!(r.resolve("x.feather", b"zz").unwrap().id(), "arrow");
204        assert_eq!(r.resolve("x.ipc", b"zz").unwrap().id(), "arrow");
205    }
206
207    #[test]
208    fn default_registry_lists_all_formats() {
209        // Registration order is the deterministic tie-break. Binary readers are
210        // feature-gated, so the expected list is composed per active feature.
211        let mut expected: Vec<&str> = Vec::new();
212        #[cfg(feature = "polars")]
213        {
214            expected.push("parquet");
215            expected.push("arrow");
216        }
217        #[cfg(feature = "evtx")]
218        expected.push("evtx");
219        #[cfg(feature = "pcap")]
220        expected.push("pcap");
221        #[cfg(feature = "xlsx")]
222        expected.push("xlsx");
223        #[cfg(feature = "sqlite")]
224        expected.push("sqlite");
225        #[cfg(feature = "datalake")]
226        {
227            expected.push("avro");
228            expected.push("orc");
229        }
230        expected.extend([
231            "otlp",
232            "cloudtrail",
233            "eve",
234            "journal",
235            "osquery",
236            "ndjson",
237            "zeek",
238            "logfmt",
239            "accesslog",
240            "syslog",
241            "cef",
242            "leef",
243            "auditd",
244            "dns",
245            "prometheus",
246            "xml",
247            "json",
248            "yaml",
249            "toml",
250            "ini",
251            "netflow",
252            "vpcflow",
253            "tsv",
254            "csv",
255        ]);
256        assert_eq!(reg().ids(), expected);
257    }
258}