1use ax_core::{AxError, Column, RecordSet, Value};
15use std::collections::BTreeMap;
16
17#[cfg(feature = "polars")]
18pub mod binary;
19pub mod format;
20pub mod infer;
21
22pub use format::Format;
23
24fn read_binary(fmt: Format, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
28 #[cfg(feature = "polars")]
29 {
30 binary::read(fmt, bytes)
31 }
32 #[cfg(not(feature = "polars"))]
33 {
34 let _ = bytes;
35 Err(AxError::Config(format!(
36 "{} requires the 'polars' feature, which was not built",
37 fmt.token()
38 )))
39 }
40}
41
42pub fn normalize(source: &str, bytes: &[u8]) -> Result<RecordSet, AxError> {
45 let fmt = Format::resolve(source, bytes)?;
46 normalize_as(source, bytes, fmt)
47}
48
49pub fn normalize_as(source: &str, bytes: &[u8], fmt: Format) -> Result<RecordSet, AxError> {
51 let columns = match fmt {
52 Format::Csv => read_delimited(bytes, b',', fmt)?,
53 Format::Tsv => read_delimited(bytes, b'\t', fmt)?,
54 Format::Ndjson => read_ndjson(bytes, fmt)?,
55 Format::Json => read_json(bytes, fmt)?,
56 Format::Parquet | Format::Arrow => read_binary(fmt, bytes)?,
57 };
58 Ok(RecordSet::new(source, fmt.token(), columns))
59}
60
61fn read_delimited(bytes: &[u8], delim: u8, fmt: Format) -> Result<Vec<Column>, AxError> {
64 let mut rdr = csv::ReaderBuilder::new()
65 .delimiter(delim)
66 .flexible(true)
67 .has_headers(true)
68 .from_reader(bytes);
69
70 let headers = rdr
71 .headers()
72 .map_err(|e| parse_err(fmt, e))?
73 .iter()
74 .map(|h| h.to_string())
75 .collect::<Vec<_>>();
76
77 let mut cols: Vec<Vec<Value>> = vec![Vec::new(); headers.len()];
78 for rec in rdr.records() {
79 let rec = rec.map_err(|e| parse_err(fmt, e))?;
80 for (i, col) in cols.iter_mut().enumerate() {
81 match rec.get(i) {
82 Some(field) => col.push(infer::infer_scalar(field)),
83 None => col.push(Value::Null),
84 }
85 }
86 }
87
88 Ok(headers
89 .into_iter()
90 .zip(cols)
91 .map(|(name, cells)| Column::new(name, cells))
92 .collect())
93}
94
95fn read_ndjson(bytes: &[u8], fmt: Format) -> Result<Vec<Column>, AxError> {
98 let text = std::str::from_utf8(bytes).map_err(|e| parse_err(fmt, e))?;
99 let mut builder = TableBuilder::new();
100 for (lineno, line) in text.lines().enumerate() {
101 if line.trim().is_empty() {
102 continue;
103 }
104 let val: serde_json::Value = serde_json::from_str(line).map_err(|e| AxError::Parse {
105 format: fmt.token().to_string(),
106 message: format!("line {}: {e}", lineno + 1),
107 })?;
108 builder.push_value(val);
109 }
110 Ok(builder.finish())
111}
112
113fn read_json(bytes: &[u8], fmt: Format) -> Result<Vec<Column>, AxError> {
116 let val: serde_json::Value = serde_json::from_slice(bytes).map_err(|e| parse_err(fmt, e))?;
117 let mut builder = TableBuilder::new();
118 match val {
119 serde_json::Value::Array(items) => {
120 for item in items {
121 builder.push_value(item);
122 }
123 }
124 other => builder.push_value(other),
125 }
126 Ok(builder.finish())
127}
128
129const VALUE_COL: &str = "value";
130
131struct TableBuilder {
134 order: Vec<String>,
135 index: BTreeMap<String, usize>,
136 cols: Vec<Vec<Value>>,
137 rows: usize,
138}
139
140impl TableBuilder {
141 fn new() -> Self {
142 TableBuilder {
143 order: Vec::new(),
144 index: BTreeMap::new(),
145 cols: Vec::new(),
146 rows: 0,
147 }
148 }
149
150 fn ensure(&mut self, name: &str) -> usize {
152 if let Some(&i) = self.index.get(name) {
153 return i;
154 }
155 let i = self.order.len();
156 self.order.push(name.to_string());
157 self.index.insert(name.to_string(), i);
158 self.cols.push(vec![Value::Null; self.rows]);
159 i
160 }
161
162 fn push_value(&mut self, val: serde_json::Value) {
165 let mut row: BTreeMap<String, Value> = BTreeMap::new();
166 match val {
167 serde_json::Value::Object(map) => {
168 for (k, v) in map {
169 row.insert(k, infer::json_to_value(&v));
170 }
171 }
172 other => {
173 row.insert(VALUE_COL.to_string(), infer::json_to_value(&other));
174 }
175 }
176 for k in row.keys() {
177 self.ensure(k);
178 }
179 for (name, &i) in &self.index {
180 let cell = row.remove(name).unwrap_or(Value::Null);
181 self.cols[i].push(cell);
182 }
183 self.rows += 1;
184 }
185
186 fn finish(self) -> Vec<Column> {
187 self.order
188 .into_iter()
189 .zip(self.cols)
190 .map(|(name, cells)| Column::new(name, cells))
191 .collect()
192 }
193}
194
195fn parse_err(fmt: Format, e: impl std::fmt::Display) -> AxError {
196 AxError::Parse {
197 format: fmt.token().to_string(),
198 message: e.to_string(),
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205 use ax_core::ColType;
206
207 #[test]
208 fn csv_roundtrip_types_and_nulls() {
209 let rs = normalize("t.csv", b"a,b\n1,x\n2,\n3,z").unwrap();
210 assert_eq!(rs.width(), 2);
211 assert_eq!(rs.rows(), 3);
212 let a = rs.column("a").unwrap();
213 assert_eq!(a.ty, ColType::Int);
214 let b = rs.column("b").unwrap();
215 assert_eq!(b.null_count(), 1);
216 }
217
218 #[test]
219 fn ndjson_key_union_pads_missing() {
220 let rs = normalize("-", b"{\"a\":1}\n{\"a\":2,\"b\":9}\n").unwrap();
221 assert_eq!(rs.rows(), 2);
222 let b = rs.column("b").unwrap();
223 assert_eq!(b.null_count(), 1);
225 }
226
227 #[test]
228 fn json_array_of_objects() {
229 let rs = normalize("d.json", br#"[{"x":10},{"x":20},{"x":30}]"#).unwrap();
230 assert_eq!(rs.rows(), 3);
231 assert_eq!(rs.column("x").unwrap().ty, ColType::Int);
232 }
233
234 #[test]
235 fn json_scalar_goes_to_value_column() {
236 let rs = normalize("d.json", b"[1,2,3]").unwrap();
237 assert_eq!(rs.column("value").unwrap().numeric(), vec![1.0, 2.0, 3.0]);
238 }
239
240 #[test]
241 fn unknown_format_errors() {
242 assert!(normalize("-", &[0x00, 0x01, 0x02, 0xff]).is_err());
243 }
244
245 #[test]
246 fn ragged_csv_pads_and_truncates() {
247 let rs = normalize("t.csv", b"a,b\n1\n2,3,4").unwrap();
248 assert_eq!(rs.rows(), 2);
249 assert_eq!(rs.column("b").unwrap().cells[0], Value::Null);
250 }
251}