anomalyx_normalize/parsers/
delimited.rs1use crate::infer;
8use crate::parser::{Confidence, FormatParser, FALLBACK, TEXT};
9use ax_core::{AxError, Column, Value};
10
11fn read_delimited(bytes: &[u8], delim: u8, id: &str) -> Result<Vec<Column>, AxError> {
14 let mut rdr = csv::ReaderBuilder::new()
15 .delimiter(delim)
16 .flexible(true)
17 .has_headers(true)
18 .from_reader(bytes);
19
20 let err = |e: csv::Error| AxError::Parse {
21 format: id.to_string(),
22 message: e.to_string(),
23 };
24
25 let headers = rdr
26 .headers()
27 .map_err(err)?
28 .iter()
29 .map(|h| h.to_string())
30 .collect::<Vec<_>>();
31
32 let mut cols: Vec<Vec<Value>> = vec![Vec::new(); headers.len()];
33 for rec in rdr.records() {
34 let rec = rec.map_err(err)?;
35 for (i, col) in cols.iter_mut().enumerate() {
36 match rec.get(i) {
37 Some(field) => col.push(infer::infer_scalar(field)),
38 None => col.push(Value::Null),
39 }
40 }
41 }
42
43 Ok(headers
44 .into_iter()
45 .zip(cols)
46 .map(|(name, cells)| Column::new(name, cells))
47 .collect())
48}
49
50fn tab_before_comma(line: &str) -> bool {
57 match (line.find('\t'), line.find(',')) {
58 (Some(t), Some(c)) => t < c,
59 (Some(_), None) => true,
60 _ => false,
61 }
62}
63
64fn tabular_first_line(bytes: &[u8]) -> Option<&str> {
67 let text = std::str::from_utf8(bytes).ok()?;
68 let trimmed = text.trim_start();
69 let first = trimmed.chars().next()?;
70 if first == '[' || first == '{' {
71 return None;
72 }
73 trimmed.lines().next()
74}
75
76#[derive(Debug, Default, Clone)]
77pub struct CsvParser;
78
79impl FormatParser for CsvParser {
80 fn id(&self) -> &'static str {
81 "csv"
82 }
83 fn extensions(&self) -> &'static [&'static str] {
84 &["csv"]
85 }
86 fn sniff(&self, bytes: &[u8]) -> Option<Confidence> {
87 tabular_first_line(bytes).map(|_| FALLBACK)
89 }
90 fn parse(&self, _source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
91 read_delimited(bytes, b',', self.id())
92 }
93}
94
95#[derive(Debug, Default, Clone)]
96pub struct TsvParser;
97
98impl FormatParser for TsvParser {
99 fn id(&self) -> &'static str {
100 "tsv"
101 }
102 fn extensions(&self) -> &'static [&'static str] {
103 &["tsv", "tab"]
104 }
105 fn sniff(&self, bytes: &[u8]) -> Option<Confidence> {
106 tabular_first_line(bytes)
107 .filter(|l| tab_before_comma(l))
108 .map(|_| TEXT)
109 }
110 fn parse(&self, _source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
111 read_delimited(bytes, b'\t', self.id())
112 }
113}
114
115#[cfg(test)]
116mod tests {
117 use super::*;
118 use ax_core::ColType;
119
120 #[test]
121 fn tab_before_comma_logic() {
122 assert!(tab_before_comma("a\tb,c")); assert!(tab_before_comma("a\tb\tc")); assert!(!tab_before_comma("a,b\tc")); assert!(!tab_before_comma("a,b,c")); }
127
128 #[test]
129 fn csv_roundtrip_types_and_nulls() {
130 let cols = CsvParser.parse("t.csv", b"a,b\n1,x\n2,\n3,z").unwrap();
131 assert_eq!(cols.len(), 2);
132 assert_eq!(cols[0].ty, ColType::Int);
133 assert_eq!(cols[1].null_count(), 1);
134 }
135
136 #[test]
137 fn ragged_csv_pads_and_truncates() {
138 let cols = CsvParser.parse("t.csv", b"a,b\n1\n2,3,4").unwrap();
139 assert_eq!(cols[1].cells[0], Value::Null); assert_eq!(cols[0].len(), 2); }
142
143 #[test]
144 fn tsv_parses_tab_delimited() {
145 let cols = TsvParser.parse("t.tsv", b"a\tb\n1\t2").unwrap();
146 assert_eq!(cols.len(), 2);
147 assert_eq!(cols[0].ty, ColType::Int);
148 }
149
150 #[test]
151 fn sniff_confidences() {
152 assert_eq!(CsvParser.sniff(b"a,b\n1,2"), Some(FALLBACK));
153 assert_eq!(TsvParser.sniff(b"a\tb\n1\t2"), Some(TEXT));
154 assert_eq!(TsvParser.sniff(b"a,b\n1,2"), None); assert_eq!(CsvParser.sniff(b"[1,2]"), None); assert_eq!(CsvParser.sniff(&[0xff, 0xfe]), None); }
158}