1use crate::format::Format;
11use ax_core::{AxError, Column, Value};
12use polars::prelude::*;
13use std::io::Cursor;
14
15pub fn read(fmt: Format, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
18 let df = match fmt {
19 Format::Parquet => ParquetReader::new(Cursor::new(bytes.to_vec()))
20 .finish()
21 .map_err(|e| parse_err(fmt, e))?,
22 Format::Arrow => IpcReader::new(Cursor::new(bytes.to_vec()))
23 .finish()
24 .map_err(|e| parse_err(fmt, e))?,
25 _ => {
26 return Err(AxError::UnknownFormat(format!(
27 "{} is not a binary format",
28 fmt.token()
29 )))
30 }
31 };
32 Ok(df_to_columns(&df))
33}
34
35fn df_to_columns(df: &DataFrame) -> Vec<Column> {
37 df.columns()
38 .iter()
39 .map(|col| {
40 let series = col.as_materialized_series();
41 let cells: Vec<Value> = series.iter().map(any_value_to_value).collect();
42 Column::new(col.name().as_str(), cells)
43 })
44 .collect()
45}
46
47fn any_value_to_value(av: AnyValue) -> Value {
51 match av {
52 AnyValue::Null => Value::Null,
53 AnyValue::Boolean(b) => Value::Bool(b),
54 AnyValue::Int8(v) => Value::Int(v as i64),
55 AnyValue::Int16(v) => Value::Int(v as i64),
56 AnyValue::Int32(v) => Value::Int(v as i64),
57 AnyValue::Int64(v) => Value::Int(v),
58 AnyValue::UInt8(v) => Value::Int(v as i64),
59 AnyValue::UInt16(v) => Value::Int(v as i64),
60 AnyValue::UInt32(v) => Value::Int(v as i64),
61 AnyValue::UInt64(v) => match i64::try_from(v) {
63 Ok(i) => Value::Int(i),
64 Err(_) => Value::Str(v.to_string()),
65 },
66 AnyValue::Float32(v) => finite_float(v as f64),
67 AnyValue::Float64(v) => finite_float(v),
68 AnyValue::String(s) => Value::Str(s.to_string()),
69 AnyValue::StringOwned(s) => Value::Str(s.to_string()),
70 other => Value::Str(other.to_string()),
71 }
72}
73
74fn finite_float(f: f64) -> Value {
76 if f.is_finite() {
77 Value::Float(f)
78 } else {
79 Value::Null
80 }
81}
82
83fn parse_err(fmt: Format, e: impl std::fmt::Display) -> AxError {
84 AxError::Parse {
85 format: fmt.token().to_string(),
86 message: e.to_string(),
87 }
88}
89
90#[cfg(test)]
91mod tests {
92 use super::*;
93 use ax_core::ColType;
94
95 fn parquet_bytes(df: &mut DataFrame) -> Vec<u8> {
96 let mut buf = Vec::new();
97 ParquetWriter::new(&mut buf).finish(df).unwrap();
98 buf
99 }
100
101 fn arrow_bytes(df: &mut DataFrame) -> Vec<u8> {
102 let mut buf = Vec::new();
103 IpcWriter::new(&mut buf).finish(df).unwrap();
104 buf
105 }
106
107 #[test]
108 fn any_value_mapping_is_exact() {
109 assert_eq!(any_value_to_value(AnyValue::Null), Value::Null);
110 assert_eq!(
111 any_value_to_value(AnyValue::Boolean(true)),
112 Value::Bool(true)
113 );
114 assert_eq!(any_value_to_value(AnyValue::Int32(5)), Value::Int(5));
115 assert_eq!(any_value_to_value(AnyValue::Int64(-9)), Value::Int(-9));
116 assert_eq!(any_value_to_value(AnyValue::UInt8(7)), Value::Int(7));
117 assert_eq!(
118 any_value_to_value(AnyValue::Float64(1.5)),
119 Value::Float(1.5)
120 );
121 assert_eq!(
122 any_value_to_value(AnyValue::String("hi")),
123 Value::Str("hi".into())
124 );
125 assert_eq!(
127 any_value_to_value(AnyValue::UInt64(u64::MAX)),
128 Value::Str(u64::MAX.to_string())
129 );
130 }
131
132 #[test]
133 fn non_finite_float_becomes_null() {
134 assert_eq!(finite_float(f64::NAN), Value::Null);
135 assert_eq!(finite_float(f64::INFINITY), Value::Null);
136 assert_eq!(finite_float(2.0), Value::Float(2.0));
137 }
138
139 #[test]
140 fn parquet_roundtrips_to_recordset() {
141 let mut df = df![
142 "amount" => [10i64, 20, 30],
143 "tier" => ["a", "b", "c"],
144 ]
145 .unwrap();
146 let cols = read(Format::Parquet, &parquet_bytes(&mut df)).unwrap();
147 assert_eq!(cols.len(), 2);
148 assert_eq!(cols[0].name, "amount");
149 assert_eq!(cols[0].ty, ColType::Int);
150 assert_eq!(cols[0].numeric(), vec![10.0, 20.0, 30.0]);
151 assert_eq!(cols[1].name, "tier");
152 assert_eq!(cols[1].ty, ColType::Str);
153 }
154
155 #[test]
156 fn arrow_roundtrips_to_recordset() {
157 let mut df = df![
158 "x" => [1.5f64, 2.5, 3.5],
159 "ok" => [true, false, true],
160 ]
161 .unwrap();
162 let cols = read(Format::Arrow, &arrow_bytes(&mut df)).unwrap();
163 assert_eq!(cols.len(), 2);
164 assert_eq!(cols[0].ty, ColType::Float);
165 assert_eq!(cols[1].ty, ColType::Bool);
166 }
167
168 #[test]
169 fn nulls_survive_the_roundtrip() {
170 let s = Series::new("v".into(), &[Some(1i64), None, Some(3)]);
171 let mut df = DataFrame::new_infer_height(vec![s.into()]).unwrap();
172 let cols = read(Format::Parquet, &parquet_bytes(&mut df)).unwrap();
173 assert_eq!(cols[0].null_count(), 1);
174 }
175
176 #[test]
177 fn normalize_routes_binary_through_read_binary() {
178 let mut df = df!["a" => [1i64, 2, 3], "b" => [4i64, 5, 6]].unwrap();
180 let rs =
181 crate::normalize_as("t.parquet", &parquet_bytes(&mut df), Format::Parquet).unwrap();
182 assert_eq!(rs.width(), 2);
183 assert_eq!(rs.rows(), 3);
184 assert_eq!(rs.format, "parquet");
185 }
186
187 #[test]
188 fn non_binary_format_is_rejected() {
189 assert!(matches!(
190 read(Format::Csv, b"a,b"),
191 Err(AxError::UnknownFormat(_))
192 ));
193 }
194
195 #[test]
196 fn corrupt_bytes_fail_cleanly() {
197 assert!(matches!(
198 read(Format::Parquet, b"PAR1 not really parquet"),
199 Err(AxError::Parse { .. })
200 ));
201 }
202}