ailake_catalog/avro_raw.rs
1// SPDX-License-Identifier: MIT OR Apache-2.0
2// Minimal Avro Object Container File writer that preserves custom schema
3// properties (like "field-id") needed by PyIceberg.
4//
5// apache-avro 0.16 strips unknown schema properties when serializing back to
6// JSON, breaking PyIceberg's avro_schema_to_iceberg conversion. This writer
7// embeds the schema JSON verbatim, then uses zigzag-encoded binary for records.
8
9// ---------------------------------------------------------------------------
10// Encoding primitives
11// ---------------------------------------------------------------------------
12
13pub fn encode_long(n: i64, buf: &mut Vec<u8>) {
14 let mut v = ((n << 1) ^ (n >> 63)) as u64;
15 loop {
16 let b = (v & 0x7F) as u8;
17 v >>= 7;
18 if v == 0 {
19 buf.push(b);
20 return;
21 }
22 buf.push(b | 0x80);
23 }
24}
25
26pub fn encode_int(n: i32, buf: &mut Vec<u8>) {
27 encode_long(n as i64, buf);
28}
29
30pub fn encode_string(s: &str, buf: &mut Vec<u8>) {
31 encode_long(s.len() as i64, buf);
32 buf.extend_from_slice(s.as_bytes());
33}
34
35pub fn encode_bytes_field(b: &[u8], buf: &mut Vec<u8>) {
36 encode_long(b.len() as i64, buf);
37 buf.extend_from_slice(b);
38}
39
40/// Encode a union as `null` (index 0).
41pub fn encode_union_null(buf: &mut Vec<u8>) {
42 encode_long(0, buf);
43}
44
45/// Encode a union as the variant at `index` with a long payload.
46pub fn encode_union_long(index: i64, val: i64, buf: &mut Vec<u8>) {
47 encode_long(index, buf);
48 encode_long(val, buf);
49}
50
51/// Encode a union as the variant at `index` with a bytes payload.
52pub fn encode_union_bytes(index: i64, val: &[u8], buf: &mut Vec<u8>) {
53 encode_long(index, buf);
54 encode_bytes_field(val, buf);
55}
56
57/// Encode an empty array.
58pub fn encode_empty_array(buf: &mut Vec<u8>) {
59 encode_long(0, buf);
60}
61
62// ---------------------------------------------------------------------------
63// Avro Object Container File
64// ---------------------------------------------------------------------------
65
66const AVRO_MAGIC: &[u8] = &[0x4F, 0x62, 0x6A, 0x01]; // "Obj\x01"
67const SYNC_MARKER: &[u8] = &[
68 0xDE, 0xAD, 0xBE, 0xEF, 0xCA, 0xFE, 0xBA, 0xBE, 0xDE, 0xAD, 0xBE, 0xEF, 0xCA, 0xFE, 0xBA, 0xBE,
69];
70
71/// Write an Avro Object Container File with the given schema JSON (verbatim)
72/// and pre-encoded binary records.
73///
74/// `schema_json` is embedded in the `avro.schema` file metadata entry exactly
75/// as given, so properties like `"field-id"` survive the round-trip.
76///
77/// `extra_meta` allows callers to inject additional Avro file metadata entries
78/// (e.g. Iceberg manifest fields: schema, partition-spec, format-version).
79pub fn write_avro_container(
80 schema_json: &str,
81 extra_meta: &[(&str, &[u8])],
82 records: &[Vec<u8>],
83) -> Vec<u8> {
84 let mut buf = Vec::new();
85
86 // Magic
87 buf.extend_from_slice(AVRO_MAGIC);
88
89 // File metadata (Avro map): 2 base entries + extra_meta in one block
90 encode_long((2 + extra_meta.len()) as i64, &mut buf);
91
92 // Entry 1: avro.schema → schema_json bytes
93 encode_string("avro.schema", &mut buf);
94 encode_bytes_field(schema_json.as_bytes(), &mut buf);
95
96 // Entry 2: avro.codec → "null"
97 encode_string("avro.codec", &mut buf);
98 encode_bytes_field(b"null", &mut buf);
99
100 // Extra entries (e.g. Iceberg manifest metadata)
101 for (key, val) in extra_meta {
102 encode_string(key, &mut buf);
103 encode_bytes_field(val, &mut buf);
104 }
105
106 // End of map
107 encode_long(0, &mut buf);
108
109 // Sync marker
110 buf.extend_from_slice(SYNC_MARKER);
111
112 // Data block (one block for all records).
113 // The file ends after the sync marker — apache-avro's Reader detects EOF when
114 // trying to read the next block count, which it treats as clean end-of-stream.
115 // Writing an explicit count=0 terminator causes the Reader to then try to read
116 // a byte_count after the zero, hitting EOF and returning an error instead.
117 if !records.is_empty() {
118 let block: Vec<u8> = records.iter().flat_map(|r| r.iter().copied()).collect();
119 encode_long(records.len() as i64, &mut buf); // object count
120 encode_long(block.len() as i64, &mut buf); // byte count
121 buf.extend_from_slice(&block);
122 buf.extend_from_slice(SYNC_MARKER);
123 }
124
125 buf
126}