Skip to main content

anomalyx_normalize/
parser.rs

1//! The format-parser plugin contract.
2//!
3//! Each file format is an independent [`FormatParser`] (one per file under
4//! `parsers/`). A [`ParserRegistry`] resolves a byte stream to a parser — by
5//! file extension first, then by content sniff — and asks it to produce
6//! columns. Adding a format is: write a `parsers/<fmt>.rs`, implement the trait,
7//! and register it in [`parsers::default_registry`]. No central `match` to edit.
8//!
9//! This mirrors the `Detector`/`Registry` pattern in `ax-detect`: explicit
10//! registration (so formats are feature-gateable and the set is deterministic),
11//! not runtime dynamic loading.
12
13use ax_core::{AxError, Column, RecordSet};
14
15/// Content-sniff confidence. Higher wins; ties break by registration order, so
16/// resolution is deterministic. Use the named constants rather than bare ints.
17pub type Confidence = u16;
18
19/// A binary magic number matched (e.g. Parquet `PAR1`). Unambiguous.
20pub const MAGIC: Confidence = 100;
21/// A distinctive text shape that should win over the generic fallback
22/// (e.g. NDJSON's repeated object-per-line).
23pub const STRONG: Confidence = 60;
24/// A recognizable text shape (single JSON document, tab-delimited).
25pub const TEXT: Confidence = 50;
26/// Last-resort claim — CSV treats any leftover text as comma-delimited.
27pub const FALLBACK: Confidence = 1;
28
29/// One file format. Implementors live in `parsers/` — one per format.
30pub trait FormatParser: Send + Sync {
31    /// Stable identifier, recorded in the envelope's `format` field (e.g. `"csv"`).
32    fn id(&self) -> &'static str;
33
34    /// File extensions this parser claims (lower-case, no dot).
35    fn extensions(&self) -> &'static [&'static str];
36
37    /// How strongly `bytes` looks like this format, or `None` if it clearly is
38    /// not. Used only when the extension doesn't decide.
39    fn sniff(&self, bytes: &[u8]) -> Option<Confidence>;
40
41    /// Parse `bytes` (from logical `source`) into columns.
42    fn parse(&self, source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError>;
43}
44
45/// An ordered set of format parsers.
46pub struct ParserRegistry {
47    parsers: Vec<Box<dyn FormatParser>>,
48}
49
50/// How [`ParserRegistry::resolve_detail`] selected a parser.
51enum Resolution {
52    /// Matched a declared file extension (the user named the format).
53    Extension,
54    /// Matched a content sniff at this confidence.
55    Sniff(Confidence),
56}
57
58impl ParserRegistry {
59    pub fn new() -> Self {
60        ParserRegistry {
61            parsers: Vec::new(),
62        }
63    }
64
65    pub fn register(&mut self, parser: Box<dyn FormatParser>) -> &mut Self {
66        self.parsers.push(parser);
67        self
68    }
69
70    /// Registered parser ids, in order (handy for `describe`/tests).
71    pub fn ids(&self) -> Vec<&'static str> {
72        self.parsers.iter().map(|p| p.id()).collect()
73    }
74
75    /// The lower-cased final extension of `source`, if any.
76    fn extension(source: &str) -> Option<String> {
77        source.rsplit('.').next().map(|e| e.to_ascii_lowercase())
78    }
79
80    /// How a parser was chosen — used to decide whether a downstream parse
81    /// failure is a real "malformed file" error or just a wrong content guess.
82    fn resolve_detail(
83        &self,
84        source: &str,
85        bytes: &[u8],
86    ) -> Result<(&dyn FormatParser, Resolution), AxError> {
87        if let Some(ext) = Self::extension(source) {
88            if let Some(p) = self
89                .parsers
90                .iter()
91                .find(|p| p.extensions().contains(&ext.as_str()))
92            {
93                return Ok((p.as_ref(), Resolution::Extension));
94            }
95        }
96        // Highest sniff confidence; strict `>` keeps the first registered winner.
97        let mut best: Option<(Confidence, &dyn FormatParser)> = None;
98        for p in &self.parsers {
99            if let Some(c) = p.sniff(bytes) {
100                if best.is_none_or(|(bc, _)| c > bc) {
101                    best = Some((c, p.as_ref()));
102                }
103            }
104        }
105        best.map(|(c, p)| (p, Resolution::Sniff(c)))
106            .ok_or_else(|| AxError::UnknownFormat(source.to_string()))
107    }
108
109    /// Resolves the parser for `source`/`bytes`: a matching file extension wins;
110    /// otherwise the highest-confidence content sniff (first registered on a
111    /// tie). An unrecognized stream is [`AxError::UnknownFormat`], never a guess.
112    pub fn resolve(&self, source: &str, bytes: &[u8]) -> Result<&dyn FormatParser, AxError> {
113        self.resolve_detail(source, bytes).map(|(p, _)| p)
114    }
115
116    /// Resolve, parse, and wrap into a [`RecordSet`] tagged with the parser id.
117    ///
118    /// When the parser was picked by a *weak* (`TEXT`/`FALLBACK`) content sniff
119    /// and then fails to parse, the content guess was simply wrong — so the
120    /// stream is reported as [`AxError::UnknownFormat`], not with that parser's
121    /// internal error (e.g. a plain-text file that merely starts with `[` is
122    /// "unrecognized", not "invalid JSON"). An explicit extension or a
123    /// `MAGIC`/`STRONG` signature that then fails is a genuine malformed-file
124    /// error and is surfaced as-is.
125    pub fn normalize(&self, source: &str, bytes: &[u8]) -> Result<RecordSet, AxError> {
126        let (parser, how) = self.resolve_detail(source, bytes)?;
127        match parser.parse(source, bytes) {
128            Ok(columns) => Ok(RecordSet::new(source, parser.id(), columns)),
129            Err(_) if matches!(how, Resolution::Sniff(c) if c < STRONG) => {
130                Err(AxError::UnknownFormat(source.to_string()))
131            }
132            Err(e) => Err(e),
133        }
134    }
135
136    /// Normalize with an explicitly chosen parser id (skips detection).
137    pub fn normalize_with(
138        &self,
139        id: &str,
140        source: &str,
141        bytes: &[u8],
142    ) -> Result<RecordSet, AxError> {
143        let parser = self
144            .parsers
145            .iter()
146            .find(|p| p.id() == id)
147            .ok_or_else(|| AxError::Config(format!("unknown format id '{id}'")))?;
148        Ok(RecordSet::new(
149            source,
150            parser.id(),
151            parser.parse(source, bytes)?,
152        ))
153    }
154}
155
156impl Default for ParserRegistry {
157    fn default() -> Self {
158        crate::parsers::default_registry()
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use super::*;
165
166    fn reg() -> ParserRegistry {
167        ParserRegistry::default()
168    }
169
170    #[test]
171    fn extension_wins_over_content() {
172        // .csv extension is honored even though the bytes look like JSON.
173        let r = reg();
174        let p = r.resolve("data.csv", b"{\"a\":1}").unwrap();
175        assert_eq!(p.id(), "csv");
176    }
177
178    #[test]
179    fn sniff_used_without_a_known_extension() {
180        assert_eq!(reg().resolve("-", b"a,b\n1,2").unwrap().id(), "csv");
181        assert_eq!(reg().resolve("-", b"a\tb\n1\t2").unwrap().id(), "tsv");
182        assert_eq!(reg().resolve("-", b"[{\"a\":1}]").unwrap().id(), "json");
183        assert_eq!(
184            reg().resolve("-", b"{\"a\":1}\n{\"a\":2}\n").unwrap().id(),
185            "ndjson"
186        );
187    }
188
189    #[cfg(feature = "polars")]
190    #[test]
191    fn binary_magic_outranks_text_sniff() {
192        assert_eq!(
193            reg().resolve("-", b"PAR1\x00\x01x").unwrap().id(),
194            "parquet"
195        );
196        assert_eq!(
197            reg().resolve("-", b"ARROW1\x00\x00x").unwrap().id(),
198            "arrow"
199        );
200    }
201
202    #[test]
203    fn csv_mentioning_par1_is_still_csv() {
204        // a CSV that merely mentions PAR1 in its data is not Parquet
205        assert_eq!(reg().resolve("-", b"a,b\nPAR1,2").unwrap().id(), "csv");
206    }
207
208    #[test]
209    fn unrecognized_stream_errors() {
210        assert!(matches!(
211            reg().resolve("-", &[0x00, 0x01, 0x02, 0xff]),
212            Err(AxError::UnknownFormat(_))
213        ));
214    }
215
216    #[test]
217    fn weak_sniff_parse_failure_is_unrecognized_not_misleading() {
218        // A plain-text line that merely starts with `[` (e.g. an Apache
219        // error_log) is grabbed by the JSON parser's cheap TEXT sniff. When the
220        // JSON parse then fails, the content guess was wrong → report
221        // UnknownFormat, not a confusing "invalid JSON" parse error.
222        let r = reg().normalize("-", b"[Sun Dec 04 04:47:44 2005] [error] not json");
223        assert!(matches!(r, Err(AxError::UnknownFormat(_))), "got {r:?}");
224    }
225
226    #[test]
227    fn malformed_input_under_a_claimed_extension_is_a_parse_error() {
228        // But when the FORMAT was confidently identified — here by the `.json`
229        // extension — a parse failure is a genuine malformed-file error and is
230        // surfaced as such, not masked as UnknownFormat.
231        let r = reg().normalize("data.json", b"{not valid json");
232        assert!(matches!(r, Err(AxError::Parse { .. })), "got {r:?}");
233    }
234
235    #[test]
236    fn malformed_input_under_a_strong_sniff_is_a_parse_error() {
237        // A STRONG content signature that then fails to parse is a real
238        // malformed-file error, not "unrecognized" — pins the `< STRONG`
239        // boundary. Zeek sniffs STRONG on `#separator` but needs `#fields`.
240        let r = reg().normalize("-", b"#separator \\x09\n#path\tconn\n");
241        assert!(matches!(r, Err(AxError::Parse { .. })), "got {r:?}");
242    }
243
244    #[test]
245    fn extension_overrides_content_sniff() {
246        // The file extension forces a parser even when the bytes would sniff as
247        // something else — which pins each parser's `extensions()`.
248        let r = reg();
249        assert_eq!(r.resolve("x.tsv", b"a,b\n1,2").unwrap().id(), "tsv");
250        assert_eq!(r.resolve("x.tab", b"a,b\n1,2").unwrap().id(), "tsv");
251        assert_eq!(r.resolve("x.json", b"a,b").unwrap().id(), "json");
252        assert_eq!(r.resolve("x.jsonl", b"a,b").unwrap().id(), "ndjson");
253        assert_eq!(r.resolve("x.csv", b"a\tb").unwrap().id(), "csv");
254    }
255
256    #[cfg(feature = "polars")]
257    #[test]
258    fn binary_extensions_resolve() {
259        let r = reg();
260        assert_eq!(r.resolve("x.parquet", b"zz").unwrap().id(), "parquet");
261        assert_eq!(r.resolve("x.pq", b"zz").unwrap().id(), "parquet");
262        assert_eq!(r.resolve("x.feather", b"zz").unwrap().id(), "arrow");
263        assert_eq!(r.resolve("x.ipc", b"zz").unwrap().id(), "arrow");
264    }
265
266    #[test]
267    fn default_registry_lists_all_formats() {
268        // Registration order is the deterministic tie-break. Binary readers are
269        // feature-gated, so the expected list is composed per active feature.
270        let mut expected: Vec<&str> = Vec::new();
271        #[cfg(feature = "polars")]
272        {
273            expected.push("parquet");
274            expected.push("arrow");
275        }
276        #[cfg(feature = "evtx")]
277        expected.push("evtx");
278        #[cfg(feature = "pcap")]
279        expected.push("pcap");
280        #[cfg(feature = "xlsx")]
281        expected.push("xlsx");
282        #[cfg(feature = "sqlite")]
283        expected.push("sqlite");
284        #[cfg(feature = "datalake")]
285        {
286            expected.push("avro");
287            expected.push("orc");
288        }
289        expected.extend([
290            "otlp",
291            "cloudtrail",
292            "eve",
293            "journal",
294            "osquery",
295            "ndjson",
296            "zeek",
297            "logfmt",
298            "accesslog",
299            "syslog",
300            "cef",
301            "leef",
302            "auditd",
303            "dns",
304            "prometheus",
305            "xml",
306            "json",
307            "yaml",
308            "toml",
309            "ini",
310            "netflow",
311            "vpcflow",
312            "tsv",
313            "csv",
314        ]);
315        assert_eq!(reg().ids(), expected);
316    }
317}