pub fn encode_long(n: i64, buf: &mut Vec<u8>) {
let mut v = ((n << 1) ^ (n >> 63)) as u64;
loop {
let b = (v & 0x7F) as u8;
v >>= 7;
if v == 0 {
buf.push(b);
return;
}
buf.push(b | 0x80);
}
}
pub fn encode_int(n: i32, buf: &mut Vec<u8>) {
encode_long(n as i64, buf);
}
pub fn encode_string(s: &str, buf: &mut Vec<u8>) {
encode_long(s.len() as i64, buf);
buf.extend_from_slice(s.as_bytes());
}
pub fn encode_bytes_field(b: &[u8], buf: &mut Vec<u8>) {
encode_long(b.len() as i64, buf);
buf.extend_from_slice(b);
}
pub fn encode_union_null(buf: &mut Vec<u8>) {
encode_long(0, buf);
}
pub fn encode_union_long(index: i64, val: i64, buf: &mut Vec<u8>) {
encode_long(index, buf);
encode_long(val, buf);
}
pub fn encode_union_bytes(index: i64, val: &[u8], buf: &mut Vec<u8>) {
encode_long(index, buf);
encode_bytes_field(val, buf);
}
pub fn encode_empty_array(buf: &mut Vec<u8>) {
encode_long(0, buf);
}
const AVRO_MAGIC: &[u8] = &[0x4F, 0x62, 0x6A, 0x01]; const SYNC_MARKER: &[u8] = &[
0xDE, 0xAD, 0xBE, 0xEF, 0xCA, 0xFE, 0xBA, 0xBE, 0xDE, 0xAD, 0xBE, 0xEF, 0xCA, 0xFE, 0xBA, 0xBE,
];
pub fn write_avro_container(
schema_json: &str,
extra_meta: &[(&str, &[u8])],
records: &[Vec<u8>],
) -> Vec<u8> {
let mut buf = Vec::new();
buf.extend_from_slice(AVRO_MAGIC);
encode_long((2 + extra_meta.len()) as i64, &mut buf);
encode_string("avro.schema", &mut buf);
encode_bytes_field(schema_json.as_bytes(), &mut buf);
encode_string("avro.codec", &mut buf);
encode_bytes_field(b"null", &mut buf);
for (key, val) in extra_meta {
encode_string(key, &mut buf);
encode_bytes_field(val, &mut buf);
}
encode_long(0, &mut buf);
buf.extend_from_slice(SYNC_MARKER);
if !records.is_empty() {
let block: Vec<u8> = records.iter().flat_map(|r| r.iter().copied()).collect();
encode_long(records.len() as i64, &mut buf); encode_long(block.len() as i64, &mut buf); buf.extend_from_slice(&block);
buf.extend_from_slice(SYNC_MARKER);
}
buf
}