Skip to main content

anomalyx_normalize/parsers/
columnar.rs

1//! Binary columnar parsers (Parquet, Arrow IPC) via the Polars/Arrow backbone.
2//!
3//! This is the *only* module that knows Polars exists. Each reader produces a
4//! Polars `DataFrame` and lowers it to engine-independent [`Column`]s — so no
5//! Polars type ever reaches a detector, the envelope, or the contract. Cell
6//! values map into the same closed [`Value`] set as the text parsers; logical
7//! types Polars exposes but anomalyx doesn't model are preserved as their
8//! string form rather than guessed at.
9
10use crate::parser::{Confidence, FormatParser, MAGIC};
11use ax_core::{AxError, Column, Value};
12use polars::prelude::*;
13use std::io::Cursor;
14
15/// Lowers a Polars `DataFrame` to `Column`s, preserving column order.
16fn df_to_columns(df: &DataFrame) -> Vec<Column> {
17    df.columns()
18        .iter()
19        .map(|col| {
20            let series = col.as_materialized_series();
21            let cells: Vec<Value> = series.iter().map(any_value_to_value).collect();
22            Column::new(col.name().as_str(), cells)
23        })
24        .collect()
25}
26
27/// Maps one Polars `AnyValue` into the closed [`Value`] set. Integer widths fold
28/// to `i64`; floats to `f64` (non-finite → `Null`); strings pass through; any
29/// other logical type is preserved as its display string (honest, not dropped).
30fn any_value_to_value(av: AnyValue) -> Value {
31    match av {
32        AnyValue::Null => Value::Null,
33        AnyValue::Boolean(b) => Value::Bool(b),
34        AnyValue::Int8(v) => Value::Int(v as i64),
35        AnyValue::Int16(v) => Value::Int(v as i64),
36        AnyValue::Int32(v) => Value::Int(v as i64),
37        AnyValue::Int64(v) => Value::Int(v),
38        AnyValue::UInt8(v) => Value::Int(v as i64),
39        AnyValue::UInt16(v) => Value::Int(v as i64),
40        AnyValue::UInt32(v) => Value::Int(v as i64),
41        // u64 can exceed i64; keep the exact value as a string rather than wrap.
42        AnyValue::UInt64(v) => match i64::try_from(v) {
43            Ok(i) => Value::Int(i),
44            Err(_) => Value::Str(v.to_string()),
45        },
46        AnyValue::Float32(v) => finite_float(v as f64),
47        AnyValue::Float64(v) => finite_float(v),
48        AnyValue::String(s) => Value::Str(s.to_string()),
49        AnyValue::StringOwned(s) => Value::Str(s.to_string()),
50        other => Value::Str(other.to_string()),
51    }
52}
53
54/// A finite float becomes `Float`; NaN/±∞ become `Null` (honest absence).
55fn finite_float(f: f64) -> Value {
56    if f.is_finite() {
57        Value::Float(f)
58    } else {
59        Value::Null
60    }
61}
62
63fn parse_err(id: &str, e: impl std::fmt::Display) -> AxError {
64    AxError::Parse {
65        format: id.to_string(),
66        message: e.to_string(),
67    }
68}
69
70#[derive(Debug, Default, Clone)]
71pub struct ParquetParser;
72
73impl FormatParser for ParquetParser {
74    fn id(&self) -> &'static str {
75        "parquet"
76    }
77    fn extensions(&self) -> &'static [&'static str] {
78        &["parquet", "pq"]
79    }
80    fn sniff(&self, bytes: &[u8]) -> Option<Confidence> {
81        // Parquet files begin (and end) with the 4-byte magic "PAR1".
82        bytes.starts_with(b"PAR1").then_some(MAGIC)
83    }
84    fn parse(&self, _source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
85        let df = ParquetReader::new(Cursor::new(bytes.to_vec()))
86            .finish()
87            .map_err(|e| parse_err(self.id(), e))?;
88        Ok(df_to_columns(&df))
89    }
90}
91
92#[derive(Debug, Default, Clone)]
93pub struct ArrowParser;
94
95impl FormatParser for ArrowParser {
96    fn id(&self) -> &'static str {
97        "arrow"
98    }
99    fn extensions(&self) -> &'static [&'static str] {
100        &["arrow", "ipc", "feather"]
101    }
102    fn sniff(&self, bytes: &[u8]) -> Option<Confidence> {
103        // Arrow IPC files begin with "ARROW1".
104        bytes.starts_with(b"ARROW1").then_some(MAGIC)
105    }
106    fn parse(&self, _source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
107        let df = IpcReader::new(Cursor::new(bytes.to_vec()))
108            .finish()
109            .map_err(|e| parse_err(self.id(), e))?;
110        Ok(df_to_columns(&df))
111    }
112}
113
114#[cfg(test)]
115mod tests {
116    use super::*;
117    use ax_core::ColType;
118
119    fn parquet_bytes(df: &mut DataFrame) -> Vec<u8> {
120        let mut buf = Vec::new();
121        ParquetWriter::new(&mut buf).finish(df).unwrap();
122        buf
123    }
124    fn arrow_bytes(df: &mut DataFrame) -> Vec<u8> {
125        let mut buf = Vec::new();
126        IpcWriter::new(&mut buf).finish(df).unwrap();
127        buf
128    }
129
130    #[test]
131    fn any_value_mapping_is_exact() {
132        assert_eq!(any_value_to_value(AnyValue::Null), Value::Null);
133        assert_eq!(
134            any_value_to_value(AnyValue::Boolean(true)),
135            Value::Bool(true)
136        );
137        assert_eq!(any_value_to_value(AnyValue::Int32(5)), Value::Int(5));
138        assert_eq!(any_value_to_value(AnyValue::Int64(-9)), Value::Int(-9));
139        assert_eq!(any_value_to_value(AnyValue::UInt8(7)), Value::Int(7));
140        assert_eq!(
141            any_value_to_value(AnyValue::Float64(1.5)),
142            Value::Float(1.5)
143        );
144        assert_eq!(
145            any_value_to_value(AnyValue::String("hi")),
146            Value::Str("hi".into())
147        );
148        // u64 beyond i64::MAX is preserved as a string, not wrapped
149        assert_eq!(
150            any_value_to_value(AnyValue::UInt64(u64::MAX)),
151            Value::Str(u64::MAX.to_string())
152        );
153    }
154
155    #[test]
156    fn non_finite_float_becomes_null() {
157        assert_eq!(finite_float(f64::NAN), Value::Null);
158        assert_eq!(finite_float(f64::INFINITY), Value::Null);
159        assert_eq!(finite_float(2.0), Value::Float(2.0));
160    }
161
162    #[test]
163    fn parquet_roundtrips_to_columns() {
164        let mut df = df!["amount" => [10i64, 20, 30], "tier" => ["a", "b", "c"]].unwrap();
165        let cols = ParquetParser
166            .parse("d.parquet", &parquet_bytes(&mut df))
167            .unwrap();
168        assert_eq!(cols.len(), 2);
169        assert_eq!(cols[0].name, "amount");
170        assert_eq!(cols[0].ty, ColType::Int);
171        assert_eq!(cols[0].numeric(), vec![10.0, 20.0, 30.0]);
172        assert_eq!(cols[1].ty, ColType::Str);
173    }
174
175    #[test]
176    fn arrow_roundtrips_to_columns() {
177        let mut df = df!["x" => [1.5f64, 2.5, 3.5], "ok" => [true, false, true]].unwrap();
178        let cols = ArrowParser.parse("d.arrow", &arrow_bytes(&mut df)).unwrap();
179        assert_eq!(cols.len(), 2);
180        assert_eq!(cols[0].ty, ColType::Float);
181        assert_eq!(cols[1].ty, ColType::Bool);
182    }
183
184    #[test]
185    fn nulls_survive_the_roundtrip() {
186        let s = Series::new("v".into(), &[Some(1i64), None, Some(3)]);
187        let mut df = DataFrame::new_infer_height(vec![s.into()]).unwrap();
188        let cols = ParquetParser
189            .parse("d.parquet", &parquet_bytes(&mut df))
190            .unwrap();
191        assert_eq!(cols[0].null_count(), 1);
192    }
193
194    #[test]
195    fn corrupt_bytes_fail_cleanly() {
196        assert!(matches!(
197            ParquetParser.parse("d.parquet", b"PAR1 not really parquet"),
198            Err(AxError::Parse { .. })
199        ));
200    }
201
202    #[test]
203    fn sniff_matches_magic() {
204        assert_eq!(ParquetParser.sniff(b"PAR1xxxx"), Some(MAGIC));
205        assert_eq!(ParquetParser.sniff(b"nope"), None);
206        assert_eq!(ArrowParser.sniff(b"ARROW1\x00"), Some(MAGIC));
207        assert_eq!(ArrowParser.sniff(b"nope"), None);
208    }
209}