1use crate::infer;
17use crate::parser::{Confidence, FormatParser, MAGIC};
18use crate::table::TableBuilder;
19use ax_core::{AxError, Column, Value};
20use std::collections::BTreeMap;
21
22fn finite_float(f: f64) -> Value {
24 if f.is_finite() {
25 Value::Float(f)
26 } else {
27 Value::Null
28 }
29}
30
31fn hex(bytes: &[u8]) -> String {
32 bytes.iter().map(|b| format!("{b:02x}")).collect()
33}
34
35fn parse_err(id: &str, e: impl std::fmt::Display) -> AxError {
36 AxError::Parse {
37 format: id.to_string(),
38 message: e.to_string(),
39 }
40}
41
42use apache_avro::types::Value as AvroValue;
45
46#[derive(Debug, Default, Clone)]
47pub struct AvroParser;
48
49fn avro_to_value(value: &AvroValue) -> Value {
51 match value {
52 AvroValue::Null => Value::Null,
53 AvroValue::Boolean(b) => Value::Bool(*b),
54 AvroValue::Int(i) => Value::Int(i64::from(*i)),
55 AvroValue::Long(i) => Value::Int(*i),
56 AvroValue::Float(f) => finite_float(f64::from(*f)),
57 AvroValue::Double(f) => finite_float(*f),
58 AvroValue::String(s) => Value::Str(s.clone()),
59 AvroValue::Enum(_, s) => Value::Str(s.clone()),
60 AvroValue::Bytes(b) | AvroValue::Fixed(_, b) => Value::Str(hex(b)),
61 AvroValue::Union(_, inner) => avro_to_value(inner),
62 AvroValue::Date(d) => Value::Int(i64::from(*d)),
63 AvroValue::TimeMillis(t) => Value::Int(i64::from(*t)),
64 AvroValue::TimeMicros(t)
65 | AvroValue::TimestampMillis(t)
66 | AvroValue::TimestampMicros(t)
67 | AvroValue::TimestampNanos(t)
68 | AvroValue::LocalTimestampMillis(t)
69 | AvroValue::LocalTimestampMicros(t)
70 | AvroValue::LocalTimestampNanos(t) => Value::Int(*t),
71 _ => Value::Null,
73 }
74}
75
76impl FormatParser for AvroParser {
77 fn id(&self) -> &'static str {
78 "avro"
79 }
80 fn extensions(&self) -> &'static [&'static str] {
81 &["avro"]
82 }
83 fn sniff(&self, bytes: &[u8]) -> Option<Confidence> {
84 bytes.starts_with(b"Obj\x01").then_some(MAGIC)
86 }
87 fn parse(&self, _source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
88 let reader = apache_avro::Reader::new(bytes).map_err(|e| parse_err(self.id(), e))?;
89 let mut builder = TableBuilder::new();
90 for record in reader {
91 let value = record.map_err(|e| parse_err(self.id(), e))?;
92 let mut row: BTreeMap<String, Value> = BTreeMap::new();
93 match value {
94 AvroValue::Record(fields) => {
95 for (name, field) in fields {
96 row.insert(name, avro_to_value(&field));
97 }
98 }
99 other => {
100 row.insert("value".to_string(), avro_to_value(&other));
101 }
102 }
103 builder.push_row(row);
104 }
105 Ok(builder.finish())
106 }
107}
108
109#[derive(Debug, Default, Clone)]
112pub struct OrcParser;
113
114fn orc_cell(array: &dyn arrow::array::Array, row: usize) -> Value {
117 if array.is_null(row) {
118 return Value::Null;
119 }
120 match arrow::util::display::array_value_to_string(array, row) {
121 Ok(s) => infer::infer_scalar(&s),
122 Err(_) => Value::Null,
123 }
124}
125
126impl FormatParser for OrcParser {
127 fn id(&self) -> &'static str {
128 "orc"
129 }
130 fn extensions(&self) -> &'static [&'static str] {
131 &["orc"]
132 }
133 fn sniff(&self, bytes: &[u8]) -> Option<Confidence> {
134 bytes.starts_with(b"ORC").then_some(MAGIC)
136 }
137 fn parse(&self, _source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
138 let cursor = bytes::Bytes::from(bytes.to_vec());
139 let reader = orc_rust::ArrowReaderBuilder::try_new(cursor)
140 .map_err(|e| parse_err(self.id(), e))?
141 .build();
142 let mut builder = TableBuilder::new();
143 for batch in reader {
144 let batch = batch.map_err(|e| parse_err(self.id(), e))?;
145 let schema = batch.schema();
146 for row in 0..batch.num_rows() {
147 let mut record: BTreeMap<String, Value> = BTreeMap::new();
148 for (i, field) in schema.fields().iter().enumerate() {
149 record.insert(
150 field.name().clone(),
151 orc_cell(batch.column(i).as_ref(), row),
152 );
153 }
154 builder.push_row(record);
155 }
156 }
157 Ok(builder.finish())
158 }
159}
160
161#[cfg(test)]
162mod tests {
163 use super::*;
164 use ax_core::ColType;
165
166 fn col<'a>(cols: &'a [Column], name: &str) -> &'a Column {
167 cols.iter()
168 .find(|c| c.name == name)
169 .unwrap_or_else(|| panic!("missing column {name}"))
170 }
171
172 fn build_avro() -> Vec<u8> {
176 use apache_avro::{types::Record, Schema, Writer};
177 let schema = Schema::parse_str(
178 r#"{"type":"record","name":"r","fields":[
179 {"name":"id","type":"long"},
180 {"name":"host","type":"string"},
181 {"name":"score","type":"double"},
182 {"name":"ok","type":"boolean"}]}"#,
183 )
184 .unwrap();
185 let mut writer = Writer::new(&schema, Vec::new());
186 for (id, host, score, ok) in [(1i64, "a", 9.5f64, true), (2, "b", 3.25, false)] {
187 let mut rec = Record::new(writer.schema()).unwrap();
188 rec.put("id", id);
189 rec.put("host", host);
190 rec.put("score", score);
191 rec.put("ok", ok);
192 writer.append(rec).unwrap();
193 }
194 writer.into_inner().unwrap()
195 }
196
197 #[test]
198 fn avro_records_become_typed_rows() {
199 let cols = AvroParser.parse("data.avro", &build_avro()).unwrap();
200 assert_eq!(col(&cols, "id").ty, ColType::Int);
201 assert_eq!(col(&cols, "id").cells, vec![Value::Int(1), Value::Int(2)]);
202 assert_eq!(
203 col(&cols, "host").cells,
204 vec![Value::Str("a".into()), Value::Str("b".into())]
205 );
206 assert_eq!(col(&cols, "score").numeric(), vec![9.5, 3.25]);
207 assert_eq!(
208 col(&cols, "ok").cells,
209 vec![Value::Bool(true), Value::Bool(false)]
210 );
211 }
212
213 #[test]
214 fn avro_to_value_units() {
215 assert_eq!(avro_to_value(&AvroValue::Null), Value::Null);
216 assert_eq!(avro_to_value(&AvroValue::Boolean(true)), Value::Bool(true));
217 assert_eq!(avro_to_value(&AvroValue::Int(5)), Value::Int(5));
218 assert_eq!(avro_to_value(&AvroValue::Long(9)), Value::Int(9));
219 assert_eq!(avro_to_value(&AvroValue::Float(1.5)), Value::Float(1.5));
220 assert_eq!(avro_to_value(&AvroValue::Double(2.5)), Value::Float(2.5));
221 assert_eq!(avro_to_value(&AvroValue::Double(f64::NAN)), Value::Null);
222 assert_eq!(
223 avro_to_value(&AvroValue::String("x".into())),
224 Value::Str("x".into())
225 );
226 assert_eq!(
227 avro_to_value(&AvroValue::Enum(0, "GET".into())),
228 Value::Str("GET".into())
229 );
230 assert_eq!(
231 avro_to_value(&AvroValue::Bytes(vec![0x00, 0xab])),
232 Value::Str("00ab".into())
233 );
234 assert_eq!(
236 avro_to_value(&AvroValue::Union(1, Box::new(AvroValue::Long(7)))),
237 Value::Int(7)
238 );
239 assert_eq!(avro_to_value(&AvroValue::Date(19000)), Value::Int(19000));
240 assert_eq!(avro_to_value(&AvroValue::TimeMillis(500)), Value::Int(500));
241 assert_eq!(
242 avro_to_value(&AvroValue::TimestampMillis(1234)),
243 Value::Int(1234)
244 );
245 assert_eq!(avro_to_value(&AvroValue::Array(vec![])), Value::Null);
247 assert_eq!(avro_to_value(&AvroValue::Record(vec![])), Value::Null);
248 }
249
250 #[test]
251 fn avro_malformed_and_sniff() {
252 assert!(matches!(
253 AvroParser.parse("data.avro", b"not avro"),
254 Err(AxError::Parse { .. })
255 ));
256 assert_eq!(AvroParser.sniff(&build_avro()), Some(MAGIC));
257 assert_eq!(AvroParser.sniff(b"Obj\x01...."), Some(MAGIC));
258 assert_eq!(AvroParser.sniff(b"ORC"), None);
259 assert_eq!(AvroParser.sniff(b"{}"), None);
260 assert_eq!(AvroParser.extensions(), &["avro"]);
261 }
262
263 fn build_orc() -> Vec<u8> {
267 use arrow::array::{ArrayRef, Float64Array, Int64Array, StringArray};
268 use arrow::record_batch::RecordBatch;
269 use std::sync::Arc;
270
271 let batch = RecordBatch::try_from_iter(vec![
272 ("id", Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef),
273 (
274 "host",
275 Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef,
276 ),
277 (
278 "score",
279 Arc::new(Float64Array::from(vec![9.5, 3.25, 7.5])) as ArrayRef,
280 ),
281 ])
282 .unwrap();
283
284 let mut buf: Vec<u8> = Vec::new();
285 {
286 let mut writer = orc_rust::ArrowWriterBuilder::new(&mut buf, batch.schema())
287 .try_build()
288 .unwrap();
289 writer.write(&batch).unwrap();
290 writer.close().unwrap();
291 }
292 buf
293 }
294
295 #[test]
296 fn orc_rows_are_type_inferred() {
297 let cols = OrcParser.parse("data.orc", &build_orc()).unwrap();
298 assert_eq!(col(&cols, "id").ty, ColType::Int);
299 assert_eq!(
300 col(&cols, "id").cells,
301 vec![Value::Int(1), Value::Int(2), Value::Int(3)]
302 );
303 assert_eq!(
304 col(&cols, "host").cells,
305 vec![
306 Value::Str("a".into()),
307 Value::Str("b".into()),
308 Value::Str("c".into())
309 ]
310 );
311 assert_eq!(col(&cols, "score").numeric(), vec![9.5, 3.25, 7.5]);
312 }
313
314 #[test]
315 fn orc_null_cell() {
316 use arrow::array::{ArrayRef, Int64Array};
317 use arrow::record_batch::RecordBatch;
318 use std::sync::Arc;
319 let batch = RecordBatch::try_from_iter(vec![(
320 "v",
321 Arc::new(Int64Array::from(vec![Some(1), None, Some(3)])) as ArrayRef,
322 )])
323 .unwrap();
324 let mut buf = Vec::new();
325 {
326 let mut w = orc_rust::ArrowWriterBuilder::new(&mut buf, batch.schema())
327 .try_build()
328 .unwrap();
329 w.write(&batch).unwrap();
330 w.close().unwrap();
331 }
332 let cols = OrcParser.parse("-", &buf).unwrap();
333 assert_eq!(col(&cols, "v").cells[1], Value::Null);
334 }
335
336 #[test]
337 fn orc_malformed_and_sniff() {
338 assert!(matches!(
339 OrcParser.parse("data.orc", b"not orc at all....."),
340 Err(AxError::Parse { .. })
341 ));
342 assert_eq!(OrcParser.sniff(&build_orc()), Some(MAGIC));
343 assert_eq!(OrcParser.sniff(b"ORC....."), Some(MAGIC));
344 assert_eq!(OrcParser.sniff(b"Obj\x01"), None);
345 assert_eq!(OrcParser.extensions(), &["orc"]);
346 }
347
348 #[test]
349 fn resolve_by_extension_and_magic() {
350 let reg = crate::parser::ParserRegistry::default();
351 assert_eq!(reg.resolve("x.avro", b"zz").unwrap().id(), "avro");
352 assert_eq!(reg.resolve("x.orc", b"zz").unwrap().id(), "orc");
353 assert_eq!(reg.resolve("-", &build_avro()).unwrap().id(), "avro");
354 assert_eq!(reg.resolve("-", &build_orc()).unwrap().id(), "orc");
355 }
356}