Skip to main content

anomalyx_normalize/parsers/
avro.rs

1//! Avro and ORC parsers — the data-lake siblings of Parquet.
2//!
3//! Both lower to the same engine-independent [`Column`]s as the Parquet/Arrow
4//! parsers, so no library type escapes the contract.
5//!
6//! - **Avro** (`apache-avro`): each record in the object-container file is a row;
7//!   record fields become typed columns. Bytes/fixed → hex `Str`; dates/times →
8//!   their integer value; nested records/arrays/maps and decimals are recorded as
9//!   `Null` (honest absence — v1 lowers only flat scalar columns).
10//! - **ORC** (`orc-rust` → Arrow): the file is read into Arrow batches; each cell
11//!   is rendered and type-inferred into the closed [`Value`] set.
12//!
13//! Detected by their binary magic (`Obj\x01` / `ORC`); extensions `.avro` /
14//! `.orc`. Behind the default-on `datalake` feature.
15
16use crate::infer;
17use crate::parser::{Confidence, FormatParser, MAGIC};
18use crate::table::TableBuilder;
19use ax_core::{AxError, Column, Value};
20use std::collections::BTreeMap;
21
22/// A finite float becomes `Float`; NaN/±∞ become `Null` (honest absence).
23fn finite_float(f: f64) -> Value {
24    if f.is_finite() {
25        Value::Float(f)
26    } else {
27        Value::Null
28    }
29}
30
31fn hex(bytes: &[u8]) -> String {
32    bytes.iter().map(|b| format!("{b:02x}")).collect()
33}
34
35fn parse_err(id: &str, e: impl std::fmt::Display) -> AxError {
36    AxError::Parse {
37        format: id.to_string(),
38        message: e.to_string(),
39    }
40}
41
42// ----------------------------------------------------------------- Avro -------
43
44use apache_avro::types::Value as AvroValue;
45
46#[derive(Debug, Default, Clone)]
47pub struct AvroParser;
48
49/// Maps an Avro value to the closed [`Value`] set.
50fn avro_to_value(value: &AvroValue) -> Value {
51    match value {
52        AvroValue::Null => Value::Null,
53        AvroValue::Boolean(b) => Value::Bool(*b),
54        AvroValue::Int(i) => Value::Int(i64::from(*i)),
55        AvroValue::Long(i) => Value::Int(*i),
56        AvroValue::Float(f) => finite_float(f64::from(*f)),
57        AvroValue::Double(f) => finite_float(*f),
58        AvroValue::String(s) => Value::Str(s.clone()),
59        AvroValue::Enum(_, s) => Value::Str(s.clone()),
60        AvroValue::Bytes(b) | AvroValue::Fixed(_, b) => Value::Str(hex(b)),
61        AvroValue::Union(_, inner) => avro_to_value(inner),
62        AvroValue::Date(d) => Value::Int(i64::from(*d)),
63        AvroValue::TimeMillis(t) => Value::Int(i64::from(*t)),
64        AvroValue::TimeMicros(t)
65        | AvroValue::TimestampMillis(t)
66        | AvroValue::TimestampMicros(t)
67        | AvroValue::TimestampNanos(t)
68        | AvroValue::LocalTimestampMillis(t)
69        | AvroValue::LocalTimestampMicros(t)
70        | AvroValue::LocalTimestampNanos(t) => Value::Int(*t),
71        // Records, arrays, maps, decimals, uuids, durations are not flat scalars.
72        _ => Value::Null,
73    }
74}
75
76impl FormatParser for AvroParser {
77    fn id(&self) -> &'static str {
78        "avro"
79    }
80    fn extensions(&self) -> &'static [&'static str] {
81        &["avro"]
82    }
83    fn sniff(&self, bytes: &[u8]) -> Option<Confidence> {
84        // Avro object-container file magic.
85        bytes.starts_with(b"Obj\x01").then_some(MAGIC)
86    }
87    fn parse(&self, _source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
88        let reader = apache_avro::Reader::new(bytes).map_err(|e| parse_err(self.id(), e))?;
89        let mut builder = TableBuilder::new();
90        for record in reader {
91            let value = record.map_err(|e| parse_err(self.id(), e))?;
92            let mut row: BTreeMap<String, Value> = BTreeMap::new();
93            match value {
94                AvroValue::Record(fields) => {
95                    for (name, field) in fields {
96                        row.insert(name, avro_to_value(&field));
97                    }
98                }
99                other => {
100                    row.insert("value".to_string(), avro_to_value(&other));
101                }
102            }
103            builder.push_row(row);
104        }
105        Ok(builder.finish())
106    }
107}
108
109// ------------------------------------------------------------------ ORC -------
110
111#[derive(Debug, Default, Clone)]
112pub struct OrcParser;
113
114/// Renders one Arrow cell into the closed [`Value`] set (null-aware; the string
115/// rendering is type-inferred so numbers/bools become typed columns).
116fn orc_cell(array: &dyn arrow::array::Array, row: usize) -> Value {
117    if array.is_null(row) {
118        return Value::Null;
119    }
120    match arrow::util::display::array_value_to_string(array, row) {
121        Ok(s) => infer::infer_scalar(&s),
122        Err(_) => Value::Null,
123    }
124}
125
126impl FormatParser for OrcParser {
127    fn id(&self) -> &'static str {
128        "orc"
129    }
130    fn extensions(&self) -> &'static [&'static str] {
131        &["orc"]
132    }
133    fn sniff(&self, bytes: &[u8]) -> Option<Confidence> {
134        // ORC files begin with the 3-byte magic "ORC".
135        bytes.starts_with(b"ORC").then_some(MAGIC)
136    }
137    fn parse(&self, _source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
138        let cursor = bytes::Bytes::from(bytes.to_vec());
139        let reader = orc_rust::ArrowReaderBuilder::try_new(cursor)
140            .map_err(|e| parse_err(self.id(), e))?
141            .build();
142        let mut builder = TableBuilder::new();
143        for batch in reader {
144            let batch = batch.map_err(|e| parse_err(self.id(), e))?;
145            let schema = batch.schema();
146            for row in 0..batch.num_rows() {
147                let mut record: BTreeMap<String, Value> = BTreeMap::new();
148                for (i, field) in schema.fields().iter().enumerate() {
149                    record.insert(
150                        field.name().clone(),
151                        orc_cell(batch.column(i).as_ref(), row),
152                    );
153                }
154                builder.push_row(record);
155            }
156        }
157        Ok(builder.finish())
158    }
159}
160
161#[cfg(test)]
162mod tests {
163    use super::*;
164    use ax_core::ColType;
165
166    fn col<'a>(cols: &'a [Column], name: &str) -> &'a Column {
167        cols.iter()
168            .find(|c| c.name == name)
169            .unwrap_or_else(|| panic!("missing column {name}"))
170    }
171
172    // ----------------------------------------------------------- Avro -------
173
174    /// Writes a tiny Avro object-container file in-memory (no committed fixture).
175    fn build_avro() -> Vec<u8> {
176        use apache_avro::{types::Record, Schema, Writer};
177        let schema = Schema::parse_str(
178            r#"{"type":"record","name":"r","fields":[
179                {"name":"id","type":"long"},
180                {"name":"host","type":"string"},
181                {"name":"score","type":"double"},
182                {"name":"ok","type":"boolean"}]}"#,
183        )
184        .unwrap();
185        let mut writer = Writer::new(&schema, Vec::new());
186        for (id, host, score, ok) in [(1i64, "a", 9.5f64, true), (2, "b", 3.25, false)] {
187            let mut rec = Record::new(writer.schema()).unwrap();
188            rec.put("id", id);
189            rec.put("host", host);
190            rec.put("score", score);
191            rec.put("ok", ok);
192            writer.append(rec).unwrap();
193        }
194        writer.into_inner().unwrap()
195    }
196
197    #[test]
198    fn avro_records_become_typed_rows() {
199        let cols = AvroParser.parse("data.avro", &build_avro()).unwrap();
200        assert_eq!(col(&cols, "id").ty, ColType::Int);
201        assert_eq!(col(&cols, "id").cells, vec![Value::Int(1), Value::Int(2)]);
202        assert_eq!(
203            col(&cols, "host").cells,
204            vec![Value::Str("a".into()), Value::Str("b".into())]
205        );
206        assert_eq!(col(&cols, "score").numeric(), vec![9.5, 3.25]);
207        assert_eq!(
208            col(&cols, "ok").cells,
209            vec![Value::Bool(true), Value::Bool(false)]
210        );
211    }
212
213    #[test]
214    fn avro_to_value_units() {
215        assert_eq!(avro_to_value(&AvroValue::Null), Value::Null);
216        assert_eq!(avro_to_value(&AvroValue::Boolean(true)), Value::Bool(true));
217        assert_eq!(avro_to_value(&AvroValue::Int(5)), Value::Int(5));
218        assert_eq!(avro_to_value(&AvroValue::Long(9)), Value::Int(9));
219        assert_eq!(avro_to_value(&AvroValue::Float(1.5)), Value::Float(1.5));
220        assert_eq!(avro_to_value(&AvroValue::Double(2.5)), Value::Float(2.5));
221        assert_eq!(avro_to_value(&AvroValue::Double(f64::NAN)), Value::Null);
222        assert_eq!(
223            avro_to_value(&AvroValue::String("x".into())),
224            Value::Str("x".into())
225        );
226        assert_eq!(
227            avro_to_value(&AvroValue::Enum(0, "GET".into())),
228            Value::Str("GET".into())
229        );
230        assert_eq!(
231            avro_to_value(&AvroValue::Bytes(vec![0x00, 0xab])),
232            Value::Str("00ab".into())
233        );
234        // A union unwraps to its held value (the common nullable shape).
235        assert_eq!(
236            avro_to_value(&AvroValue::Union(1, Box::new(AvroValue::Long(7)))),
237            Value::Int(7)
238        );
239        assert_eq!(avro_to_value(&AvroValue::Date(19000)), Value::Int(19000));
240        assert_eq!(avro_to_value(&AvroValue::TimeMillis(500)), Value::Int(500));
241        assert_eq!(
242            avro_to_value(&AvroValue::TimestampMillis(1234)),
243            Value::Int(1234)
244        );
245        // Complex/non-scalar values are recorded as absent.
246        assert_eq!(avro_to_value(&AvroValue::Array(vec![])), Value::Null);
247        assert_eq!(avro_to_value(&AvroValue::Record(vec![])), Value::Null);
248    }
249
250    #[test]
251    fn avro_malformed_and_sniff() {
252        assert!(matches!(
253            AvroParser.parse("data.avro", b"not avro"),
254            Err(AxError::Parse { .. })
255        ));
256        assert_eq!(AvroParser.sniff(&build_avro()), Some(MAGIC));
257        assert_eq!(AvroParser.sniff(b"Obj\x01...."), Some(MAGIC));
258        assert_eq!(AvroParser.sniff(b"ORC"), None);
259        assert_eq!(AvroParser.sniff(b"{}"), None);
260        assert_eq!(AvroParser.extensions(), &["avro"]);
261    }
262
263    // ------------------------------------------------------------ ORC -------
264
265    /// Writes a tiny ORC file in-memory via the orc-rust Arrow writer.
266    fn build_orc() -> Vec<u8> {
267        use arrow::array::{ArrayRef, Float64Array, Int64Array, StringArray};
268        use arrow::record_batch::RecordBatch;
269        use std::sync::Arc;
270
271        let batch = RecordBatch::try_from_iter(vec![
272            ("id", Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef),
273            (
274                "host",
275                Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef,
276            ),
277            (
278                "score",
279                Arc::new(Float64Array::from(vec![9.5, 3.25, 7.5])) as ArrayRef,
280            ),
281        ])
282        .unwrap();
283
284        let mut buf: Vec<u8> = Vec::new();
285        {
286            let mut writer = orc_rust::ArrowWriterBuilder::new(&mut buf, batch.schema())
287                .try_build()
288                .unwrap();
289            writer.write(&batch).unwrap();
290            writer.close().unwrap();
291        }
292        buf
293    }
294
295    #[test]
296    fn orc_rows_are_type_inferred() {
297        let cols = OrcParser.parse("data.orc", &build_orc()).unwrap();
298        assert_eq!(col(&cols, "id").ty, ColType::Int);
299        assert_eq!(
300            col(&cols, "id").cells,
301            vec![Value::Int(1), Value::Int(2), Value::Int(3)]
302        );
303        assert_eq!(
304            col(&cols, "host").cells,
305            vec![
306                Value::Str("a".into()),
307                Value::Str("b".into()),
308                Value::Str("c".into())
309            ]
310        );
311        assert_eq!(col(&cols, "score").numeric(), vec![9.5, 3.25, 7.5]);
312    }
313
314    #[test]
315    fn orc_null_cell() {
316        use arrow::array::{ArrayRef, Int64Array};
317        use arrow::record_batch::RecordBatch;
318        use std::sync::Arc;
319        let batch = RecordBatch::try_from_iter(vec![(
320            "v",
321            Arc::new(Int64Array::from(vec![Some(1), None, Some(3)])) as ArrayRef,
322        )])
323        .unwrap();
324        let mut buf = Vec::new();
325        {
326            let mut w = orc_rust::ArrowWriterBuilder::new(&mut buf, batch.schema())
327                .try_build()
328                .unwrap();
329            w.write(&batch).unwrap();
330            w.close().unwrap();
331        }
332        let cols = OrcParser.parse("-", &buf).unwrap();
333        assert_eq!(col(&cols, "v").cells[1], Value::Null);
334    }
335
336    #[test]
337    fn orc_malformed_and_sniff() {
338        assert!(matches!(
339            OrcParser.parse("data.orc", b"not orc at all....."),
340            Err(AxError::Parse { .. })
341        ));
342        assert_eq!(OrcParser.sniff(&build_orc()), Some(MAGIC));
343        assert_eq!(OrcParser.sniff(b"ORC....."), Some(MAGIC));
344        assert_eq!(OrcParser.sniff(b"Obj\x01"), None);
345        assert_eq!(OrcParser.extensions(), &["orc"]);
346    }
347
348    #[test]
349    fn resolve_by_extension_and_magic() {
350        let reg = crate::parser::ParserRegistry::default();
351        assert_eq!(reg.resolve("x.avro", b"zz").unwrap().id(), "avro");
352        assert_eq!(reg.resolve("x.orc", b"zz").unwrap().id(), "orc");
353        assert_eq!(reg.resolve("-", &build_avro()).unwrap().id(), "avro");
354        assert_eq!(reg.resolve("-", &build_orc()).unwrap().id(), "orc");
355    }
356}