1use ax_core::{AxError, Column, RecordSet};
14
15pub type Confidence = u16;
18
19pub const MAGIC: Confidence = 100;
21pub const STRONG: Confidence = 60;
24pub const TEXT: Confidence = 50;
26pub const FALLBACK: Confidence = 1;
28
29pub trait FormatParser: Send + Sync {
31 fn id(&self) -> &'static str;
33
34 fn extensions(&self) -> &'static [&'static str];
36
37 fn sniff(&self, bytes: &[u8]) -> Option<Confidence>;
40
41 fn parse(&self, source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError>;
43}
44
45pub struct ParserRegistry {
47 parsers: Vec<Box<dyn FormatParser>>,
48}
49
50enum Resolution {
52 Extension,
54 Sniff(Confidence),
56}
57
58impl ParserRegistry {
59 pub fn new() -> Self {
60 ParserRegistry {
61 parsers: Vec::new(),
62 }
63 }
64
65 pub fn register(&mut self, parser: Box<dyn FormatParser>) -> &mut Self {
66 self.parsers.push(parser);
67 self
68 }
69
70 pub fn ids(&self) -> Vec<&'static str> {
72 self.parsers.iter().map(|p| p.id()).collect()
73 }
74
75 fn extension(source: &str) -> Option<String> {
77 source.rsplit('.').next().map(|e| e.to_ascii_lowercase())
78 }
79
80 fn resolve_detail(
83 &self,
84 source: &str,
85 bytes: &[u8],
86 ) -> Result<(&dyn FormatParser, Resolution), AxError> {
87 if let Some(ext) = Self::extension(source) {
88 if let Some(p) = self
89 .parsers
90 .iter()
91 .find(|p| p.extensions().contains(&ext.as_str()))
92 {
93 return Ok((p.as_ref(), Resolution::Extension));
94 }
95 }
96 let mut best: Option<(Confidence, &dyn FormatParser)> = None;
98 for p in &self.parsers {
99 if let Some(c) = p.sniff(bytes) {
100 if best.is_none_or(|(bc, _)| c > bc) {
101 best = Some((c, p.as_ref()));
102 }
103 }
104 }
105 best.map(|(c, p)| (p, Resolution::Sniff(c)))
106 .ok_or_else(|| AxError::UnknownFormat(source.to_string()))
107 }
108
109 pub fn resolve(&self, source: &str, bytes: &[u8]) -> Result<&dyn FormatParser, AxError> {
113 self.resolve_detail(source, bytes).map(|(p, _)| p)
114 }
115
116 pub fn normalize(&self, source: &str, bytes: &[u8]) -> Result<RecordSet, AxError> {
126 let (parser, how) = self.resolve_detail(source, bytes)?;
127 match parser.parse(source, bytes) {
128 Ok(columns) => Ok(RecordSet::new(source, parser.id(), columns)),
129 Err(_) if matches!(how, Resolution::Sniff(c) if c < STRONG) => {
130 Err(AxError::UnknownFormat(source.to_string()))
131 }
132 Err(e) => Err(e),
133 }
134 }
135
136 pub fn normalize_with(
138 &self,
139 id: &str,
140 source: &str,
141 bytes: &[u8],
142 ) -> Result<RecordSet, AxError> {
143 let parser = self
144 .parsers
145 .iter()
146 .find(|p| p.id() == id)
147 .ok_or_else(|| AxError::Config(format!("unknown format id '{id}'")))?;
148 Ok(RecordSet::new(
149 source,
150 parser.id(),
151 parser.parse(source, bytes)?,
152 ))
153 }
154}
155
156impl Default for ParserRegistry {
157 fn default() -> Self {
158 crate::parsers::default_registry()
159 }
160}
161
162#[cfg(test)]
163mod tests {
164 use super::*;
165
166 fn reg() -> ParserRegistry {
167 ParserRegistry::default()
168 }
169
170 #[test]
171 fn extension_wins_over_content() {
172 let r = reg();
174 let p = r.resolve("data.csv", b"{\"a\":1}").unwrap();
175 assert_eq!(p.id(), "csv");
176 }
177
178 #[test]
179 fn sniff_used_without_a_known_extension() {
180 assert_eq!(reg().resolve("-", b"a,b\n1,2").unwrap().id(), "csv");
181 assert_eq!(reg().resolve("-", b"a\tb\n1\t2").unwrap().id(), "tsv");
182 assert_eq!(reg().resolve("-", b"[{\"a\":1}]").unwrap().id(), "json");
183 assert_eq!(
184 reg().resolve("-", b"{\"a\":1}\n{\"a\":2}\n").unwrap().id(),
185 "ndjson"
186 );
187 }
188
189 #[cfg(feature = "polars")]
190 #[test]
191 fn binary_magic_outranks_text_sniff() {
192 assert_eq!(
193 reg().resolve("-", b"PAR1\x00\x01x").unwrap().id(),
194 "parquet"
195 );
196 assert_eq!(
197 reg().resolve("-", b"ARROW1\x00\x00x").unwrap().id(),
198 "arrow"
199 );
200 }
201
202 #[test]
203 fn csv_mentioning_par1_is_still_csv() {
204 assert_eq!(reg().resolve("-", b"a,b\nPAR1,2").unwrap().id(), "csv");
206 }
207
208 #[test]
209 fn unrecognized_stream_errors() {
210 assert!(matches!(
211 reg().resolve("-", &[0x00, 0x01, 0x02, 0xff]),
212 Err(AxError::UnknownFormat(_))
213 ));
214 }
215
216 #[test]
217 fn weak_sniff_parse_failure_is_unrecognized_not_misleading() {
218 let r = reg().normalize("-", b"[Sun Dec 04 04:47:44 2005] [error] not json");
223 assert!(matches!(r, Err(AxError::UnknownFormat(_))), "got {r:?}");
224 }
225
226 #[test]
227 fn malformed_input_under_a_claimed_extension_is_a_parse_error() {
228 let r = reg().normalize("data.json", b"{not valid json");
232 assert!(matches!(r, Err(AxError::Parse { .. })), "got {r:?}");
233 }
234
235 #[test]
236 fn malformed_input_under_a_strong_sniff_is_a_parse_error() {
237 let r = reg().normalize("-", b"#separator \\x09\n#path\tconn\n");
241 assert!(matches!(r, Err(AxError::Parse { .. })), "got {r:?}");
242 }
243
244 #[test]
245 fn extension_overrides_content_sniff() {
246 let r = reg();
249 assert_eq!(r.resolve("x.tsv", b"a,b\n1,2").unwrap().id(), "tsv");
250 assert_eq!(r.resolve("x.tab", b"a,b\n1,2").unwrap().id(), "tsv");
251 assert_eq!(r.resolve("x.json", b"a,b").unwrap().id(), "json");
252 assert_eq!(r.resolve("x.jsonl", b"a,b").unwrap().id(), "ndjson");
253 assert_eq!(r.resolve("x.csv", b"a\tb").unwrap().id(), "csv");
254 }
255
256 #[cfg(feature = "polars")]
257 #[test]
258 fn binary_extensions_resolve() {
259 let r = reg();
260 assert_eq!(r.resolve("x.parquet", b"zz").unwrap().id(), "parquet");
261 assert_eq!(r.resolve("x.pq", b"zz").unwrap().id(), "parquet");
262 assert_eq!(r.resolve("x.feather", b"zz").unwrap().id(), "arrow");
263 assert_eq!(r.resolve("x.ipc", b"zz").unwrap().id(), "arrow");
264 }
265
266 #[test]
267 fn default_registry_lists_all_formats() {
268 let mut expected: Vec<&str> = Vec::new();
271 #[cfg(feature = "polars")]
272 {
273 expected.push("parquet");
274 expected.push("arrow");
275 }
276 #[cfg(feature = "evtx")]
277 expected.push("evtx");
278 #[cfg(feature = "pcap")]
279 expected.push("pcap");
280 #[cfg(feature = "xlsx")]
281 expected.push("xlsx");
282 #[cfg(feature = "sqlite")]
283 expected.push("sqlite");
284 #[cfg(feature = "datalake")]
285 {
286 expected.push("avro");
287 expected.push("orc");
288 }
289 expected.extend([
290 "otlp",
291 "cloudtrail",
292 "eve",
293 "journal",
294 "osquery",
295 "ndjson",
296 "zeek",
297 "logfmt",
298 "accesslog",
299 "syslog",
300 "cef",
301 "leef",
302 "auditd",
303 "dns",
304 "prometheus",
305 "xml",
306 "json",
307 "yaml",
308 "toml",
309 "ini",
310 "netflow",
311 "vpcflow",
312 "tsv",
313 "csv",
314 ]);
315 assert_eq!(reg().ids(), expected);
316 }
317}