avro_schema/write/
file.rs

1use std::collections::HashMap;
2
3use crate::error::Error;
4use crate::file::Compression;
5use crate::schema::{Record, Schema};
6
7use super::encode;
8
9pub(crate) const SYNC_NUMBER: [u8; 16] = [1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4];
10// * Four bytes, ASCII 'O', 'b', 'j', followed by 1.
11pub(crate) const AVRO_MAGIC: [u8; 4] = [b'O', b'b', b'j', 1u8];
12
13/// Serializes an [`Schema`] and optional [`Compression`] into an avro header.
14fn serialize_header(
15    schema: &Schema,
16    compression: Option<Compression>,
17) -> Result<HashMap<String, Vec<u8>>, Error> {
18    let schema = serde_json::to_string(schema).map_err(|_| Error::OutOfSpec)?;
19
20    let mut header = HashMap::<String, Vec<u8>>::default();
21
22    header.insert("avro.schema".to_string(), schema.into_bytes());
23    if let Some(compression) = compression {
24        let value = match compression {
25            Compression::Snappy => b"snappy".to_vec(),
26            Compression::Deflate => b"deflate".to_vec(),
27        };
28        header.insert("avro.codec".to_string(), value);
29    };
30
31    Ok(header)
32}
33
34/// Writes Avro's metadata to `writer`.
35pub fn write_metadata<W: std::io::Write>(
36    writer: &mut W,
37    record: Record,
38    compression: Option<Compression>,
39) -> Result<(), Error> {
40    writer.write_all(&AVRO_MAGIC)?;
41
42    // * file metadata, including the schema.
43    let schema = Schema::Record(record);
44
45    write_schema(writer, &schema, compression)?;
46
47    // The 16-byte, randomly-generated sync marker for this file.
48    writer.write_all(&SYNC_NUMBER)?;
49
50    Ok(())
51}
52
53pub(crate) fn write_schema<W: std::io::Write>(
54    writer: &mut W,
55    schema: &Schema,
56    compression: Option<Compression>,
57) -> Result<(), Error> {
58    let header = serialize_header(schema, compression)?;
59
60    encode::zigzag_encode(header.len() as i64, writer)?;
61    for (name, item) in header {
62        encode::write_binary(name.as_bytes(), writer)?;
63        encode::write_binary(&item, writer)?;
64    }
65    writer.write_all(&[0])?;
66    Ok(())
67}