anomalyx_normalize/parsers/
columnar.rs1use crate::parser::{Confidence, FormatParser, MAGIC};
11use ax_core::{AxError, Column, Value};
12use polars::prelude::*;
13use std::io::Cursor;
14
15fn df_to_columns(df: &DataFrame) -> Vec<Column> {
17 df.columns()
18 .iter()
19 .map(|col| {
20 let series = col.as_materialized_series();
21 let cells: Vec<Value> = series.iter().map(any_value_to_value).collect();
22 Column::new(col.name().as_str(), cells)
23 })
24 .collect()
25}
26
27fn any_value_to_value(av: AnyValue) -> Value {
31 match av {
32 AnyValue::Null => Value::Null,
33 AnyValue::Boolean(b) => Value::Bool(b),
34 AnyValue::Int8(v) => Value::Int(v as i64),
35 AnyValue::Int16(v) => Value::Int(v as i64),
36 AnyValue::Int32(v) => Value::Int(v as i64),
37 AnyValue::Int64(v) => Value::Int(v),
38 AnyValue::UInt8(v) => Value::Int(v as i64),
39 AnyValue::UInt16(v) => Value::Int(v as i64),
40 AnyValue::UInt32(v) => Value::Int(v as i64),
41 AnyValue::UInt64(v) => match i64::try_from(v) {
43 Ok(i) => Value::Int(i),
44 Err(_) => Value::Str(v.to_string()),
45 },
46 AnyValue::Float32(v) => finite_float(v as f64),
47 AnyValue::Float64(v) => finite_float(v),
48 AnyValue::String(s) => Value::Str(s.to_string()),
49 AnyValue::StringOwned(s) => Value::Str(s.to_string()),
50 other => Value::Str(other.to_string()),
51 }
52}
53
54fn finite_float(f: f64) -> Value {
56 if f.is_finite() {
57 Value::Float(f)
58 } else {
59 Value::Null
60 }
61}
62
63fn parse_err(id: &str, e: impl std::fmt::Display) -> AxError {
64 AxError::Parse {
65 format: id.to_string(),
66 message: e.to_string(),
67 }
68}
69
70#[derive(Debug, Default, Clone)]
71pub struct ParquetParser;
72
73impl FormatParser for ParquetParser {
74 fn id(&self) -> &'static str {
75 "parquet"
76 }
77 fn extensions(&self) -> &'static [&'static str] {
78 &["parquet", "pq"]
79 }
80 fn sniff(&self, bytes: &[u8]) -> Option<Confidence> {
81 bytes.starts_with(b"PAR1").then_some(MAGIC)
83 }
84 fn parse(&self, _source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
85 let df = ParquetReader::new(Cursor::new(bytes.to_vec()))
86 .finish()
87 .map_err(|e| parse_err(self.id(), e))?;
88 Ok(df_to_columns(&df))
89 }
90}
91
92#[derive(Debug, Default, Clone)]
93pub struct ArrowParser;
94
95impl FormatParser for ArrowParser {
96 fn id(&self) -> &'static str {
97 "arrow"
98 }
99 fn extensions(&self) -> &'static [&'static str] {
100 &["arrow", "ipc", "feather"]
101 }
102 fn sniff(&self, bytes: &[u8]) -> Option<Confidence> {
103 bytes.starts_with(b"ARROW1").then_some(MAGIC)
105 }
106 fn parse(&self, _source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
107 let df = IpcReader::new(Cursor::new(bytes.to_vec()))
108 .finish()
109 .map_err(|e| parse_err(self.id(), e))?;
110 Ok(df_to_columns(&df))
111 }
112}
113
114#[cfg(test)]
115mod tests {
116 use super::*;
117 use ax_core::ColType;
118
119 fn parquet_bytes(df: &mut DataFrame) -> Vec<u8> {
120 let mut buf = Vec::new();
121 ParquetWriter::new(&mut buf).finish(df).unwrap();
122 buf
123 }
124 fn arrow_bytes(df: &mut DataFrame) -> Vec<u8> {
125 let mut buf = Vec::new();
126 IpcWriter::new(&mut buf).finish(df).unwrap();
127 buf
128 }
129
130 #[test]
131 fn any_value_mapping_is_exact() {
132 assert_eq!(any_value_to_value(AnyValue::Null), Value::Null);
133 assert_eq!(
134 any_value_to_value(AnyValue::Boolean(true)),
135 Value::Bool(true)
136 );
137 assert_eq!(any_value_to_value(AnyValue::Int32(5)), Value::Int(5));
138 assert_eq!(any_value_to_value(AnyValue::Int64(-9)), Value::Int(-9));
139 assert_eq!(any_value_to_value(AnyValue::UInt8(7)), Value::Int(7));
140 assert_eq!(
141 any_value_to_value(AnyValue::Float64(1.5)),
142 Value::Float(1.5)
143 );
144 assert_eq!(
145 any_value_to_value(AnyValue::String("hi")),
146 Value::Str("hi".into())
147 );
148 assert_eq!(
150 any_value_to_value(AnyValue::UInt64(u64::MAX)),
151 Value::Str(u64::MAX.to_string())
152 );
153 }
154
155 #[test]
156 fn non_finite_float_becomes_null() {
157 assert_eq!(finite_float(f64::NAN), Value::Null);
158 assert_eq!(finite_float(f64::INFINITY), Value::Null);
159 assert_eq!(finite_float(2.0), Value::Float(2.0));
160 }
161
162 #[test]
163 fn parquet_roundtrips_to_columns() {
164 let mut df = df!["amount" => [10i64, 20, 30], "tier" => ["a", "b", "c"]].unwrap();
165 let cols = ParquetParser
166 .parse("d.parquet", &parquet_bytes(&mut df))
167 .unwrap();
168 assert_eq!(cols.len(), 2);
169 assert_eq!(cols[0].name, "amount");
170 assert_eq!(cols[0].ty, ColType::Int);
171 assert_eq!(cols[0].numeric(), vec![10.0, 20.0, 30.0]);
172 assert_eq!(cols[1].ty, ColType::Str);
173 }
174
175 #[test]
176 fn arrow_roundtrips_to_columns() {
177 let mut df = df!["x" => [1.5f64, 2.5, 3.5], "ok" => [true, false, true]].unwrap();
178 let cols = ArrowParser.parse("d.arrow", &arrow_bytes(&mut df)).unwrap();
179 assert_eq!(cols.len(), 2);
180 assert_eq!(cols[0].ty, ColType::Float);
181 assert_eq!(cols[1].ty, ColType::Bool);
182 }
183
184 #[test]
185 fn nulls_survive_the_roundtrip() {
186 let s = Series::new("v".into(), &[Some(1i64), None, Some(3)]);
187 let mut df = DataFrame::new_infer_height(vec![s.into()]).unwrap();
188 let cols = ParquetParser
189 .parse("d.parquet", &parquet_bytes(&mut df))
190 .unwrap();
191 assert_eq!(cols[0].null_count(), 1);
192 }
193
194 #[test]
195 fn corrupt_bytes_fail_cleanly() {
196 assert!(matches!(
197 ParquetParser.parse("d.parquet", b"PAR1 not really parquet"),
198 Err(AxError::Parse { .. })
199 ));
200 }
201
202 #[test]
203 fn sniff_matches_magic() {
204 assert_eq!(ParquetParser.sniff(b"PAR1xxxx"), Some(MAGIC));
205 assert_eq!(ParquetParser.sniff(b"nope"), None);
206 assert_eq!(ArrowParser.sniff(b"ARROW1\x00"), Some(MAGIC));
207 assert_eq!(ArrowParser.sniff(b"nope"), None);
208 }
209}