Skip to main content

anomalyx_normalize/
binary.rs

1//! Binary columnar formats via the Polars/Arrow backbone.
2//!
3//! This is the *only* module that knows Polars exists. It reads Parquet and
4//! Arrow IPC into a Polars `DataFrame` and lowers that to the engine-independent
5//! [`ax_core::RecordSet`] — so detectors, the envelope, and the contract never
6//! see a Polars type. Cell values map into the same closed [`Value`] set used by
7//! the text normalizer, and unsupported logical types (dates, nested, …) are
8//! preserved as their string form rather than guessed at.
9
10use crate::format::Format;
11use ax_core::{AxError, Column, Value};
12use polars::prelude::*;
13use std::io::Cursor;
14
15/// Reads a binary `fmt` into columns. Only [`Format::Parquet`] and
16/// [`Format::Arrow`] are handled; anything else is a programming error.
17pub fn read(fmt: Format, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
18    let df = match fmt {
19        Format::Parquet => ParquetReader::new(Cursor::new(bytes.to_vec()))
20            .finish()
21            .map_err(|e| parse_err(fmt, e))?,
22        Format::Arrow => IpcReader::new(Cursor::new(bytes.to_vec()))
23            .finish()
24            .map_err(|e| parse_err(fmt, e))?,
25        _ => {
26            return Err(AxError::UnknownFormat(format!(
27                "{} is not a binary format",
28                fmt.token()
29            )))
30        }
31    };
32    Ok(df_to_columns(&df))
33}
34
35/// Lowers a Polars `DataFrame` to `RecordSet` columns, preserving column order.
36fn df_to_columns(df: &DataFrame) -> Vec<Column> {
37    df.columns()
38        .iter()
39        .map(|col| {
40            let series = col.as_materialized_series();
41            let cells: Vec<Value> = series.iter().map(any_value_to_value).collect();
42            Column::new(col.name().as_str(), cells)
43        })
44        .collect()
45}
46
47/// Maps one Polars `AnyValue` into the closed [`Value`] set. Integer widths fold
48/// to `i64`; floats to `f64` (non-finite → `Null`); strings pass through; every
49/// other logical type is preserved as its display string (honest, not dropped).
50fn any_value_to_value(av: AnyValue) -> Value {
51    match av {
52        AnyValue::Null => Value::Null,
53        AnyValue::Boolean(b) => Value::Bool(b),
54        AnyValue::Int8(v) => Value::Int(v as i64),
55        AnyValue::Int16(v) => Value::Int(v as i64),
56        AnyValue::Int32(v) => Value::Int(v as i64),
57        AnyValue::Int64(v) => Value::Int(v),
58        AnyValue::UInt8(v) => Value::Int(v as i64),
59        AnyValue::UInt16(v) => Value::Int(v as i64),
60        AnyValue::UInt32(v) => Value::Int(v as i64),
61        // u64 can exceed i64; keep the exact value as a string rather than wrap.
62        AnyValue::UInt64(v) => match i64::try_from(v) {
63            Ok(i) => Value::Int(i),
64            Err(_) => Value::Str(v.to_string()),
65        },
66        AnyValue::Float32(v) => finite_float(v as f64),
67        AnyValue::Float64(v) => finite_float(v),
68        AnyValue::String(s) => Value::Str(s.to_string()),
69        AnyValue::StringOwned(s) => Value::Str(s.to_string()),
70        other => Value::Str(other.to_string()),
71    }
72}
73
74/// A finite float becomes `Float`; NaN/±∞ become `Null` (honest absence).
75fn finite_float(f: f64) -> Value {
76    if f.is_finite() {
77        Value::Float(f)
78    } else {
79        Value::Null
80    }
81}
82
83fn parse_err(fmt: Format, e: impl std::fmt::Display) -> AxError {
84    AxError::Parse {
85        format: fmt.token().to_string(),
86        message: e.to_string(),
87    }
88}
89
90#[cfg(test)]
91mod tests {
92    use super::*;
93    use ax_core::ColType;
94
95    fn parquet_bytes(df: &mut DataFrame) -> Vec<u8> {
96        let mut buf = Vec::new();
97        ParquetWriter::new(&mut buf).finish(df).unwrap();
98        buf
99    }
100
101    fn arrow_bytes(df: &mut DataFrame) -> Vec<u8> {
102        let mut buf = Vec::new();
103        IpcWriter::new(&mut buf).finish(df).unwrap();
104        buf
105    }
106
107    #[test]
108    fn any_value_mapping_is_exact() {
109        assert_eq!(any_value_to_value(AnyValue::Null), Value::Null);
110        assert_eq!(
111            any_value_to_value(AnyValue::Boolean(true)),
112            Value::Bool(true)
113        );
114        assert_eq!(any_value_to_value(AnyValue::Int32(5)), Value::Int(5));
115        assert_eq!(any_value_to_value(AnyValue::Int64(-9)), Value::Int(-9));
116        assert_eq!(any_value_to_value(AnyValue::UInt8(7)), Value::Int(7));
117        assert_eq!(
118            any_value_to_value(AnyValue::Float64(1.5)),
119            Value::Float(1.5)
120        );
121        assert_eq!(
122            any_value_to_value(AnyValue::String("hi")),
123            Value::Str("hi".into())
124        );
125        // u64 beyond i64::MAX is preserved as a string, not wrapped
126        assert_eq!(
127            any_value_to_value(AnyValue::UInt64(u64::MAX)),
128            Value::Str(u64::MAX.to_string())
129        );
130    }
131
132    #[test]
133    fn non_finite_float_becomes_null() {
134        assert_eq!(finite_float(f64::NAN), Value::Null);
135        assert_eq!(finite_float(f64::INFINITY), Value::Null);
136        assert_eq!(finite_float(2.0), Value::Float(2.0));
137    }
138
139    #[test]
140    fn parquet_roundtrips_to_recordset() {
141        let mut df = df![
142            "amount" => [10i64, 20, 30],
143            "tier" => ["a", "b", "c"],
144        ]
145        .unwrap();
146        let cols = read(Format::Parquet, &parquet_bytes(&mut df)).unwrap();
147        assert_eq!(cols.len(), 2);
148        assert_eq!(cols[0].name, "amount");
149        assert_eq!(cols[0].ty, ColType::Int);
150        assert_eq!(cols[0].numeric(), vec![10.0, 20.0, 30.0]);
151        assert_eq!(cols[1].name, "tier");
152        assert_eq!(cols[1].ty, ColType::Str);
153    }
154
155    #[test]
156    fn arrow_roundtrips_to_recordset() {
157        let mut df = df![
158            "x" => [1.5f64, 2.5, 3.5],
159            "ok" => [true, false, true],
160        ]
161        .unwrap();
162        let cols = read(Format::Arrow, &arrow_bytes(&mut df)).unwrap();
163        assert_eq!(cols.len(), 2);
164        assert_eq!(cols[0].ty, ColType::Float);
165        assert_eq!(cols[1].ty, ColType::Bool);
166    }
167
168    #[test]
169    fn nulls_survive_the_roundtrip() {
170        let s = Series::new("v".into(), &[Some(1i64), None, Some(3)]);
171        let mut df = DataFrame::new_infer_height(vec![s.into()]).unwrap();
172        let cols = read(Format::Parquet, &parquet_bytes(&mut df)).unwrap();
173        assert_eq!(cols[0].null_count(), 1);
174    }
175
176    #[test]
177    fn normalize_routes_binary_through_read_binary() {
178        // Exercises the crate-level dispatch (lib::read_binary), not just read().
179        let mut df = df!["a" => [1i64, 2, 3], "b" => [4i64, 5, 6]].unwrap();
180        let rs =
181            crate::normalize_as("t.parquet", &parquet_bytes(&mut df), Format::Parquet).unwrap();
182        assert_eq!(rs.width(), 2);
183        assert_eq!(rs.rows(), 3);
184        assert_eq!(rs.format, "parquet");
185    }
186
187    #[test]
188    fn non_binary_format_is_rejected() {
189        assert!(matches!(
190            read(Format::Csv, b"a,b"),
191            Err(AxError::UnknownFormat(_))
192        ));
193    }
194
195    #[test]
196    fn corrupt_bytes_fail_cleanly() {
197        assert!(matches!(
198            read(Format::Parquet, b"PAR1 not really parquet"),
199            Err(AxError::Parse { .. })
200        ));
201    }
202}