use ax_core::{AxError, Column, RecordSet};
pub type Confidence = u16;
pub const MAGIC: Confidence = 100;
pub const STRONG: Confidence = 60;
pub const TEXT: Confidence = 50;
pub const FALLBACK: Confidence = 1;
pub trait FormatParser: Send + Sync {
fn id(&self) -> &'static str;
fn extensions(&self) -> &'static [&'static str];
fn sniff(&self, bytes: &[u8]) -> Option<Confidence>;
fn parse(&self, source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError>;
}
pub struct ParserRegistry {
parsers: Vec<Box<dyn FormatParser>>,
}
enum Resolution {
Extension,
Sniff(Confidence),
}
impl ParserRegistry {
pub fn new() -> Self {
ParserRegistry {
parsers: Vec::new(),
}
}
pub fn register(&mut self, parser: Box<dyn FormatParser>) -> &mut Self {
self.parsers.push(parser);
self
}
pub fn ids(&self) -> Vec<&'static str> {
self.parsers.iter().map(|p| p.id()).collect()
}
fn extension(source: &str) -> Option<String> {
source.rsplit('.').next().map(|e| e.to_ascii_lowercase())
}
fn resolve_detail(
&self,
source: &str,
bytes: &[u8],
) -> Result<(&dyn FormatParser, Resolution), AxError> {
if let Some(ext) = Self::extension(source) {
if let Some(p) = self
.parsers
.iter()
.find(|p| p.extensions().contains(&ext.as_str()))
{
return Ok((p.as_ref(), Resolution::Extension));
}
}
let mut best: Option<(Confidence, &dyn FormatParser)> = None;
for p in &self.parsers {
if let Some(c) = p.sniff(bytes) {
if best.is_none_or(|(bc, _)| c > bc) {
best = Some((c, p.as_ref()));
}
}
}
best.map(|(c, p)| (p, Resolution::Sniff(c)))
.ok_or_else(|| AxError::UnknownFormat(source.to_string()))
}
pub fn resolve(&self, source: &str, bytes: &[u8]) -> Result<&dyn FormatParser, AxError> {
self.resolve_detail(source, bytes).map(|(p, _)| p)
}
pub fn normalize(&self, source: &str, bytes: &[u8]) -> Result<RecordSet, AxError> {
let (parser, how) = self.resolve_detail(source, bytes)?;
match parser.parse(source, bytes) {
Ok(columns) => Ok(RecordSet::new(source, parser.id(), columns)),
Err(_) if matches!(how, Resolution::Sniff(c) if c < STRONG) => {
Err(AxError::UnknownFormat(source.to_string()))
}
Err(e) => Err(e),
}
}
pub fn normalize_with(
&self,
id: &str,
source: &str,
bytes: &[u8],
) -> Result<RecordSet, AxError> {
let parser = self
.parsers
.iter()
.find(|p| p.id() == id)
.ok_or_else(|| AxError::Config(format!("unknown format id '{id}'")))?;
Ok(RecordSet::new(
source,
parser.id(),
parser.parse(source, bytes)?,
))
}
}
impl Default for ParserRegistry {
fn default() -> Self {
crate::parsers::default_registry()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn reg() -> ParserRegistry {
ParserRegistry::default()
}
#[test]
fn extension_wins_over_content() {
let r = reg();
let p = r.resolve("data.csv", b"{\"a\":1}").unwrap();
assert_eq!(p.id(), "csv");
}
#[test]
fn sniff_used_without_a_known_extension() {
assert_eq!(reg().resolve("-", b"a,b\n1,2").unwrap().id(), "csv");
assert_eq!(reg().resolve("-", b"a\tb\n1\t2").unwrap().id(), "tsv");
assert_eq!(reg().resolve("-", b"[{\"a\":1}]").unwrap().id(), "json");
assert_eq!(
reg().resolve("-", b"{\"a\":1}\n{\"a\":2}\n").unwrap().id(),
"ndjson"
);
}
#[cfg(feature = "polars")]
#[test]
fn binary_magic_outranks_text_sniff() {
assert_eq!(
reg().resolve("-", b"PAR1\x00\x01x").unwrap().id(),
"parquet"
);
assert_eq!(
reg().resolve("-", b"ARROW1\x00\x00x").unwrap().id(),
"arrow"
);
}
#[test]
fn csv_mentioning_par1_is_still_csv() {
assert_eq!(reg().resolve("-", b"a,b\nPAR1,2").unwrap().id(), "csv");
}
#[test]
fn unrecognized_stream_errors() {
assert!(matches!(
reg().resolve("-", &[0x00, 0x01, 0x02, 0xff]),
Err(AxError::UnknownFormat(_))
));
}
#[test]
fn weak_sniff_parse_failure_is_unrecognized_not_misleading() {
let r = reg().normalize("-", b"[Sun Dec 04 04:47:44 2005] [error] not json");
assert!(matches!(r, Err(AxError::UnknownFormat(_))), "got {r:?}");
}
#[test]
fn malformed_input_under_a_claimed_extension_is_a_parse_error() {
let r = reg().normalize("data.json", b"{not valid json");
assert!(matches!(r, Err(AxError::Parse { .. })), "got {r:?}");
}
#[test]
fn malformed_input_under_a_strong_sniff_is_a_parse_error() {
let r = reg().normalize("-", b"#separator \\x09\n#path\tconn\n");
assert!(matches!(r, Err(AxError::Parse { .. })), "got {r:?}");
}
#[test]
fn extension_overrides_content_sniff() {
let r = reg();
assert_eq!(r.resolve("x.tsv", b"a,b\n1,2").unwrap().id(), "tsv");
assert_eq!(r.resolve("x.tab", b"a,b\n1,2").unwrap().id(), "tsv");
assert_eq!(r.resolve("x.json", b"a,b").unwrap().id(), "json");
assert_eq!(r.resolve("x.jsonl", b"a,b").unwrap().id(), "ndjson");
assert_eq!(r.resolve("x.csv", b"a\tb").unwrap().id(), "csv");
}
#[cfg(feature = "polars")]
#[test]
fn binary_extensions_resolve() {
let r = reg();
assert_eq!(r.resolve("x.parquet", b"zz").unwrap().id(), "parquet");
assert_eq!(r.resolve("x.pq", b"zz").unwrap().id(), "parquet");
assert_eq!(r.resolve("x.feather", b"zz").unwrap().id(), "arrow");
assert_eq!(r.resolve("x.ipc", b"zz").unwrap().id(), "arrow");
}
#[test]
fn default_registry_lists_all_formats() {
let mut expected: Vec<&str> = Vec::new();
#[cfg(feature = "polars")]
{
expected.push("parquet");
expected.push("arrow");
}
#[cfg(feature = "evtx")]
expected.push("evtx");
#[cfg(feature = "pcap")]
expected.push("pcap");
#[cfg(feature = "xlsx")]
expected.push("xlsx");
#[cfg(feature = "sqlite")]
expected.push("sqlite");
#[cfg(feature = "datalake")]
{
expected.push("avro");
expected.push("orc");
}
expected.extend([
"otlp",
"cloudtrail",
"eve",
"journal",
"osquery",
"ndjson",
"zeek",
"logfmt",
"accesslog",
"syslog",
"cef",
"leef",
"auditd",
"dns",
"prometheus",
"xml",
"json",
"yaml",
"toml",
"ini",
"netflow",
"vpcflow",
"tsv",
"csv",
]);
assert_eq!(reg().ids(), expected);
}
}