1use ax_core::{AxError, Column, RecordSet};
14
15pub type Confidence = u16;
18
19pub const MAGIC: Confidence = 100;
21pub const STRONG: Confidence = 60;
24pub const TEXT: Confidence = 50;
26pub const FALLBACK: Confidence = 1;
28
29pub trait FormatParser: Send + Sync {
31 fn id(&self) -> &'static str;
33
34 fn extensions(&self) -> &'static [&'static str];
36
37 fn sniff(&self, bytes: &[u8]) -> Option<Confidence>;
40
41 fn parse(&self, source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError>;
43}
44
45pub struct ParserRegistry {
47 parsers: Vec<Box<dyn FormatParser>>,
48}
49
50impl ParserRegistry {
51 pub fn new() -> Self {
52 ParserRegistry {
53 parsers: Vec::new(),
54 }
55 }
56
57 pub fn register(&mut self, parser: Box<dyn FormatParser>) -> &mut Self {
58 self.parsers.push(parser);
59 self
60 }
61
62 pub fn ids(&self) -> Vec<&'static str> {
64 self.parsers.iter().map(|p| p.id()).collect()
65 }
66
67 fn extension(source: &str) -> Option<String> {
69 source.rsplit('.').next().map(|e| e.to_ascii_lowercase())
70 }
71
72 pub fn resolve(&self, source: &str, bytes: &[u8]) -> Result<&dyn FormatParser, AxError> {
76 if let Some(ext) = Self::extension(source) {
77 if let Some(p) = self
78 .parsers
79 .iter()
80 .find(|p| p.extensions().contains(&ext.as_str()))
81 {
82 return Ok(p.as_ref());
83 }
84 }
85 let mut best: Option<(Confidence, &dyn FormatParser)> = None;
87 for p in &self.parsers {
88 if let Some(c) = p.sniff(bytes) {
89 if best.is_none_or(|(bc, _)| c > bc) {
90 best = Some((c, p.as_ref()));
91 }
92 }
93 }
94 best.map(|(_, p)| p)
95 .ok_or_else(|| AxError::UnknownFormat(source.to_string()))
96 }
97
98 pub fn normalize(&self, source: &str, bytes: &[u8]) -> Result<RecordSet, AxError> {
100 let parser = self.resolve(source, bytes)?;
101 let columns = parser.parse(source, bytes)?;
102 Ok(RecordSet::new(source, parser.id(), columns))
103 }
104
105 pub fn normalize_with(
107 &self,
108 id: &str,
109 source: &str,
110 bytes: &[u8],
111 ) -> Result<RecordSet, AxError> {
112 let parser = self
113 .parsers
114 .iter()
115 .find(|p| p.id() == id)
116 .ok_or_else(|| AxError::Config(format!("unknown format id '{id}'")))?;
117 Ok(RecordSet::new(
118 source,
119 parser.id(),
120 parser.parse(source, bytes)?,
121 ))
122 }
123}
124
125impl Default for ParserRegistry {
126 fn default() -> Self {
127 crate::parsers::default_registry()
128 }
129}
130
131#[cfg(test)]
132mod tests {
133 use super::*;
134
135 fn reg() -> ParserRegistry {
136 ParserRegistry::default()
137 }
138
139 #[test]
140 fn extension_wins_over_content() {
141 let r = reg();
143 let p = r.resolve("data.csv", b"{\"a\":1}").unwrap();
144 assert_eq!(p.id(), "csv");
145 }
146
147 #[test]
148 fn sniff_used_without_a_known_extension() {
149 assert_eq!(reg().resolve("-", b"a,b\n1,2").unwrap().id(), "csv");
150 assert_eq!(reg().resolve("-", b"a\tb\n1\t2").unwrap().id(), "tsv");
151 assert_eq!(reg().resolve("-", b"[{\"a\":1}]").unwrap().id(), "json");
152 assert_eq!(
153 reg().resolve("-", b"{\"a\":1}\n{\"a\":2}\n").unwrap().id(),
154 "ndjson"
155 );
156 }
157
158 #[cfg(feature = "polars")]
159 #[test]
160 fn binary_magic_outranks_text_sniff() {
161 assert_eq!(
162 reg().resolve("-", b"PAR1\x00\x01x").unwrap().id(),
163 "parquet"
164 );
165 assert_eq!(
166 reg().resolve("-", b"ARROW1\x00\x00x").unwrap().id(),
167 "arrow"
168 );
169 }
170
171 #[test]
172 fn csv_mentioning_par1_is_still_csv() {
173 assert_eq!(reg().resolve("-", b"a,b\nPAR1,2").unwrap().id(), "csv");
175 }
176
177 #[test]
178 fn unrecognized_stream_errors() {
179 assert!(matches!(
180 reg().resolve("-", &[0x00, 0x01, 0x02, 0xff]),
181 Err(AxError::UnknownFormat(_))
182 ));
183 }
184
185 #[test]
186 fn extension_overrides_content_sniff() {
187 let r = reg();
190 assert_eq!(r.resolve("x.tsv", b"a,b\n1,2").unwrap().id(), "tsv");
191 assert_eq!(r.resolve("x.tab", b"a,b\n1,2").unwrap().id(), "tsv");
192 assert_eq!(r.resolve("x.json", b"a,b").unwrap().id(), "json");
193 assert_eq!(r.resolve("x.jsonl", b"a,b").unwrap().id(), "ndjson");
194 assert_eq!(r.resolve("x.csv", b"a\tb").unwrap().id(), "csv");
195 }
196
197 #[cfg(feature = "polars")]
198 #[test]
199 fn binary_extensions_resolve() {
200 let r = reg();
201 assert_eq!(r.resolve("x.parquet", b"zz").unwrap().id(), "parquet");
202 assert_eq!(r.resolve("x.pq", b"zz").unwrap().id(), "parquet");
203 assert_eq!(r.resolve("x.feather", b"zz").unwrap().id(), "arrow");
204 assert_eq!(r.resolve("x.ipc", b"zz").unwrap().id(), "arrow");
205 }
206
207 #[test]
208 fn default_registry_lists_all_formats() {
209 let mut expected: Vec<&str> = Vec::new();
212 #[cfg(feature = "polars")]
213 {
214 expected.push("parquet");
215 expected.push("arrow");
216 }
217 #[cfg(feature = "evtx")]
218 expected.push("evtx");
219 #[cfg(feature = "pcap")]
220 expected.push("pcap");
221 #[cfg(feature = "xlsx")]
222 expected.push("xlsx");
223 #[cfg(feature = "sqlite")]
224 expected.push("sqlite");
225 #[cfg(feature = "datalake")]
226 {
227 expected.push("avro");
228 expected.push("orc");
229 }
230 expected.extend([
231 "otlp",
232 "cloudtrail",
233 "eve",
234 "journal",
235 "osquery",
236 "ndjson",
237 "zeek",
238 "logfmt",
239 "accesslog",
240 "syslog",
241 "cef",
242 "leef",
243 "auditd",
244 "dns",
245 "prometheus",
246 "xml",
247 "json",
248 "yaml",
249 "toml",
250 "ini",
251 "netflow",
252 "vpcflow",
253 "tsv",
254 "csv",
255 ]);
256 assert_eq!(reg().ids(), expected);
257 }
258}