Skip to main content

anomalyx_normalize/
lib.rs

1//! # ax-normalize — any corpus → one [`RecordSet`]
2//!
3//! The article's normalization promise: *"given any corpus of information
4//! regardless of its format, we'll normalize it."* This crate maps recognized
5//! text formats (CSV, TSV, NDJSON, JSON) onto the engine-independent
6//! [`RecordSet`] from `ax-core`. Binary columnar formats (Parquet, Arrow IPC)
7//! land behind this same boundary in the Polars-backed slice — detectors never
8//! see the difference.
9//!
10//! Normalization is deterministic: column order is stable (header order for
11//! tabular input, sorted key-union for JSON), and absence is explicit — a key
12//! missing from one JSON row becomes [`ax_core::Value::Null`], never a guess.
13
14use ax_core::{AxError, Column, RecordSet, Value};
15use std::collections::BTreeMap;
16
17#[cfg(feature = "polars")]
18pub mod binary;
19pub mod format;
20pub mod infer;
21
22pub use format::Format;
23
24/// Reads a binary columnar format (Parquet/Arrow) into columns. Behind the
25/// `polars` feature; without it, binary formats fail cleanly (honest absence)
26/// rather than the crate silently mis-handling them.
27fn read_binary(fmt: Format, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
28    #[cfg(feature = "polars")]
29    {
30        binary::read(fmt, bytes)
31    }
32    #[cfg(not(feature = "polars"))]
33    {
34        let _ = bytes;
35        Err(AxError::Config(format!(
36            "{} requires the 'polars' feature, which was not built",
37            fmt.token()
38        )))
39    }
40}
41
42/// Normalizes `bytes` from logical `source` into a [`RecordSet`], resolving the
43/// format by extension then content sniff.
44pub fn normalize(source: &str, bytes: &[u8]) -> Result<RecordSet, AxError> {
45    let fmt = Format::resolve(source, bytes)?;
46    normalize_as(source, bytes, fmt)
47}
48
49/// Normalizes with an explicit format (skips detection).
50pub fn normalize_as(source: &str, bytes: &[u8], fmt: Format) -> Result<RecordSet, AxError> {
51    let columns = match fmt {
52        Format::Csv => read_delimited(bytes, b',', fmt)?,
53        Format::Tsv => read_delimited(bytes, b'\t', fmt)?,
54        Format::Ndjson => read_ndjson(bytes, fmt)?,
55        Format::Json => read_json(bytes, fmt)?,
56        Format::Parquet | Format::Arrow => read_binary(fmt, bytes)?,
57    };
58    Ok(RecordSet::new(source, fmt.token(), columns))
59}
60
61/// Reads CSV/TSV with a header row. Field count is normalized to the header
62/// width: short rows pad with [`Value::Null`], long rows truncate.
63fn read_delimited(bytes: &[u8], delim: u8, fmt: Format) -> Result<Vec<Column>, AxError> {
64    let mut rdr = csv::ReaderBuilder::new()
65        .delimiter(delim)
66        .flexible(true)
67        .has_headers(true)
68        .from_reader(bytes);
69
70    let headers = rdr
71        .headers()
72        .map_err(|e| parse_err(fmt, e))?
73        .iter()
74        .map(|h| h.to_string())
75        .collect::<Vec<_>>();
76
77    let mut cols: Vec<Vec<Value>> = vec![Vec::new(); headers.len()];
78    for rec in rdr.records() {
79        let rec = rec.map_err(|e| parse_err(fmt, e))?;
80        for (i, col) in cols.iter_mut().enumerate() {
81            match rec.get(i) {
82                Some(field) => col.push(infer::infer_scalar(field)),
83                None => col.push(Value::Null),
84            }
85        }
86    }
87
88    Ok(headers
89        .into_iter()
90        .zip(cols)
91        .map(|(name, cells)| Column::new(name, cells))
92        .collect())
93}
94
95/// Reads newline-delimited JSON. Each non-empty line is one record; scalar or
96/// array lines are placed under a synthetic `value` column.
97fn read_ndjson(bytes: &[u8], fmt: Format) -> Result<Vec<Column>, AxError> {
98    let text = std::str::from_utf8(bytes).map_err(|e| parse_err(fmt, e))?;
99    let mut builder = TableBuilder::new();
100    for (lineno, line) in text.lines().enumerate() {
101        if line.trim().is_empty() {
102            continue;
103        }
104        let val: serde_json::Value = serde_json::from_str(line).map_err(|e| AxError::Parse {
105            format: fmt.token().to_string(),
106            message: format!("line {}: {e}", lineno + 1),
107        })?;
108        builder.push_value(val);
109    }
110    Ok(builder.finish())
111}
112
113/// Reads a single JSON document: an array of records, a lone object (one row),
114/// or a scalar/array (one `value` cell).
115fn read_json(bytes: &[u8], fmt: Format) -> Result<Vec<Column>, AxError> {
116    let val: serde_json::Value = serde_json::from_slice(bytes).map_err(|e| parse_err(fmt, e))?;
117    let mut builder = TableBuilder::new();
118    match val {
119        serde_json::Value::Array(items) => {
120            for item in items {
121                builder.push_value(item);
122            }
123        }
124        other => builder.push_value(other),
125    }
126    Ok(builder.finish())
127}
128
129const VALUE_COL: &str = "value";
130
131/// Accumulates JSON records into columns with a stable, sorted key-union order.
132/// Missing keys fill with [`Value::Null`] so every column ends equal length.
133struct TableBuilder {
134    order: Vec<String>,
135    index: BTreeMap<String, usize>,
136    cols: Vec<Vec<Value>>,
137    rows: usize,
138}
139
140impl TableBuilder {
141    fn new() -> Self {
142        TableBuilder {
143            order: Vec::new(),
144            index: BTreeMap::new(),
145            cols: Vec::new(),
146            rows: 0,
147        }
148    }
149
150    /// Ensures a column exists, back-filling it with `Null` for prior rows.
151    fn ensure(&mut self, name: &str) -> usize {
152        if let Some(&i) = self.index.get(name) {
153            return i;
154        }
155        let i = self.order.len();
156        self.order.push(name.to_string());
157        self.index.insert(name.to_string(), i);
158        self.cols.push(vec![Value::Null; self.rows]);
159        i
160    }
161
162    /// Adds one record. Objects contribute their fields; anything else goes to
163    /// the synthetic `value` column.
164    fn push_value(&mut self, val: serde_json::Value) {
165        let mut row: BTreeMap<String, Value> = BTreeMap::new();
166        match val {
167            serde_json::Value::Object(map) => {
168                for (k, v) in map {
169                    row.insert(k, infer::json_to_value(&v));
170                }
171            }
172            other => {
173                row.insert(VALUE_COL.to_string(), infer::json_to_value(&other));
174            }
175        }
176        for k in row.keys() {
177            self.ensure(k);
178        }
179        for (name, &i) in &self.index {
180            let cell = row.remove(name).unwrap_or(Value::Null);
181            self.cols[i].push(cell);
182        }
183        self.rows += 1;
184    }
185
186    fn finish(self) -> Vec<Column> {
187        self.order
188            .into_iter()
189            .zip(self.cols)
190            .map(|(name, cells)| Column::new(name, cells))
191            .collect()
192    }
193}
194
195fn parse_err(fmt: Format, e: impl std::fmt::Display) -> AxError {
196    AxError::Parse {
197        format: fmt.token().to_string(),
198        message: e.to_string(),
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205    use ax_core::ColType;
206
207    #[test]
208    fn csv_roundtrip_types_and_nulls() {
209        let rs = normalize("t.csv", b"a,b\n1,x\n2,\n3,z").unwrap();
210        assert_eq!(rs.width(), 2);
211        assert_eq!(rs.rows(), 3);
212        let a = rs.column("a").unwrap();
213        assert_eq!(a.ty, ColType::Int);
214        let b = rs.column("b").unwrap();
215        assert_eq!(b.null_count(), 1);
216    }
217
218    #[test]
219    fn ndjson_key_union_pads_missing() {
220        let rs = normalize("-", b"{\"a\":1}\n{\"a\":2,\"b\":9}\n").unwrap();
221        assert_eq!(rs.rows(), 2);
222        let b = rs.column("b").unwrap();
223        // first row had no `b`
224        assert_eq!(b.null_count(), 1);
225    }
226
227    #[test]
228    fn json_array_of_objects() {
229        let rs = normalize("d.json", br#"[{"x":10},{"x":20},{"x":30}]"#).unwrap();
230        assert_eq!(rs.rows(), 3);
231        assert_eq!(rs.column("x").unwrap().ty, ColType::Int);
232    }
233
234    #[test]
235    fn json_scalar_goes_to_value_column() {
236        let rs = normalize("d.json", b"[1,2,3]").unwrap();
237        assert_eq!(rs.column("value").unwrap().numeric(), vec![1.0, 2.0, 3.0]);
238    }
239
240    #[test]
241    fn unknown_format_errors() {
242        assert!(normalize("-", &[0x00, 0x01, 0x02, 0xff]).is_err());
243    }
244
245    #[test]
246    fn ragged_csv_pads_and_truncates() {
247        let rs = normalize("t.csv", b"a,b\n1\n2,3,4").unwrap();
248        assert_eq!(rs.rows(), 2);
249        assert_eq!(rs.column("b").unwrap().cells[0], Value::Null);
250    }
251}