Skip to main content

ailake_catalog/
avro_raw.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2// Minimal Avro Object Container File writer that preserves custom schema
3// properties (like "field-id") needed by PyIceberg.
4//
5// apache-avro 0.16 strips unknown schema properties when serializing back to
6// JSON, breaking PyIceberg's avro_schema_to_iceberg conversion. This writer
7// embeds the schema JSON verbatim, then uses zigzag-encoded binary for records.
8
9// ---------------------------------------------------------------------------
10// Encoding primitives
11// ---------------------------------------------------------------------------
12
13pub fn encode_long(n: i64, buf: &mut Vec<u8>) {
14    let mut v = ((n << 1) ^ (n >> 63)) as u64;
15    loop {
16        let b = (v & 0x7F) as u8;
17        v >>= 7;
18        if v == 0 {
19            buf.push(b);
20            return;
21        }
22        buf.push(b | 0x80);
23    }
24}
25
26pub fn encode_int(n: i32, buf: &mut Vec<u8>) {
27    encode_long(n as i64, buf);
28}
29
30pub fn encode_string(s: &str, buf: &mut Vec<u8>) {
31    encode_long(s.len() as i64, buf);
32    buf.extend_from_slice(s.as_bytes());
33}
34
35pub fn encode_bytes_field(b: &[u8], buf: &mut Vec<u8>) {
36    encode_long(b.len() as i64, buf);
37    buf.extend_from_slice(b);
38}
39
40/// Encode a union as `null` (index 0).
41pub fn encode_union_null(buf: &mut Vec<u8>) {
42    encode_long(0, buf);
43}
44
45/// Encode a union as the variant at `index` with a long payload.
46pub fn encode_union_long(index: i64, val: i64, buf: &mut Vec<u8>) {
47    encode_long(index, buf);
48    encode_long(val, buf);
49}
50
51/// Encode a union as the variant at `index` with a bytes payload.
52pub fn encode_union_bytes(index: i64, val: &[u8], buf: &mut Vec<u8>) {
53    encode_long(index, buf);
54    encode_bytes_field(val, buf);
55}
56
57/// Encode an empty array.
58pub fn encode_empty_array(buf: &mut Vec<u8>) {
59    encode_long(0, buf);
60}
61
62// ---------------------------------------------------------------------------
63// Avro Object Container File
64// ---------------------------------------------------------------------------
65
66const AVRO_MAGIC: &[u8] = &[0x4F, 0x62, 0x6A, 0x01]; // "Obj\x01"
67const SYNC_MARKER: &[u8] = &[
68    0xDE, 0xAD, 0xBE, 0xEF, 0xCA, 0xFE, 0xBA, 0xBE, 0xDE, 0xAD, 0xBE, 0xEF, 0xCA, 0xFE, 0xBA, 0xBE,
69];
70
71/// Write an Avro Object Container File with the given schema JSON (verbatim)
72/// and pre-encoded binary records.
73///
74/// `schema_json` is embedded in the `avro.schema` file metadata entry exactly
75/// as given, so properties like `"field-id"` survive the round-trip.
76///
77/// `extra_meta` allows callers to inject additional Avro file metadata entries
78/// (e.g. Iceberg manifest fields: schema, partition-spec, format-version).
79pub fn write_avro_container(
80    schema_json: &str,
81    extra_meta: &[(&str, &[u8])],
82    records: &[Vec<u8>],
83) -> Vec<u8> {
84    let mut buf = Vec::new();
85
86    // Magic
87    buf.extend_from_slice(AVRO_MAGIC);
88
89    // File metadata (Avro map): 2 base entries + extra_meta in one block
90    encode_long((2 + extra_meta.len()) as i64, &mut buf);
91
92    // Entry 1: avro.schema → schema_json bytes
93    encode_string("avro.schema", &mut buf);
94    encode_bytes_field(schema_json.as_bytes(), &mut buf);
95
96    // Entry 2: avro.codec → "null"
97    encode_string("avro.codec", &mut buf);
98    encode_bytes_field(b"null", &mut buf);
99
100    // Extra entries (e.g. Iceberg manifest metadata)
101    for (key, val) in extra_meta {
102        encode_string(key, &mut buf);
103        encode_bytes_field(val, &mut buf);
104    }
105
106    // End of map
107    encode_long(0, &mut buf);
108
109    // Sync marker
110    buf.extend_from_slice(SYNC_MARKER);
111
112    // Data block (one block for all records).
113    // The file ends after the sync marker — apache-avro's Reader detects EOF when
114    // trying to read the next block count, which it treats as clean end-of-stream.
115    // Writing an explicit count=0 terminator causes the Reader to then try to read
116    // a byte_count after the zero, hitting EOF and returning an error instead.
117    if !records.is_empty() {
118        let block: Vec<u8> = records.iter().flat_map(|r| r.iter().copied()).collect();
119        encode_long(records.len() as i64, &mut buf); // object count
120        encode_long(block.len() as i64, &mut buf); // byte count
121        buf.extend_from_slice(&block);
122        buf.extend_from_slice(SYNC_MARKER);
123    }
124
125    buf
126}