avro_schema/read/
mod.rs

1//! Functions to read and decompress Files' metadata and blocks
2mod block;
3mod decode;
4pub(crate) mod decompress;
5
6use std::io::Read;
7
8use crate::error::Error;
9use crate::file::FileMetadata;
10use crate::schema::Schema;
11
12pub use fallible_streaming_iterator;
13
14// macros that can operate in sync and async code.
15macro_rules! avro_decode {
16    ($reader:ident $($_await:tt)*) => {
17        {
18            let mut i = 0u64;
19            let mut buf = [0u8; 1];
20            let mut j = 0;
21            loop {
22                if j > 9 {
23                    // if j * 7 > 64
24                    return Err(DecodeError::OutOfSpec);
25                }
26                $reader.read_exact(&mut buf[..])$($_await)*.map_err(|_| DecodeError::EndOfFile)?;
27                i |= (u64::from(buf[0] & 0x7F)) << (j * 7);
28                if (buf[0] >> 7) == 0 {
29                    break;
30                } else {
31                    j += 1;
32                }
33            }
34
35            Ok(i)
36        }
37    }
38}
39
40macro_rules! read_header {
41    ($reader:ident $($_await:tt)*) => {{
42        let mut items = HashMap::new();
43
44        loop {
45            let len = zigzag_i64($reader)$($_await)*.map_err(|_| Error::OutOfSpec)? as usize;
46            if len == 0 {
47                break Ok(items);
48            }
49
50            items.reserve(len);
51            for _ in 0..len {
52                let key = _read_binary($reader)$($_await)*?;
53                let key = String::from_utf8(key)
54                    .map_err(|_| Error::OutOfSpec)?;
55                let value = _read_binary($reader)$($_await)*?;
56                items.insert(key, value);
57            }
58        }
59    }};
60}
61
62macro_rules! read_metadata_macro {
63    ($reader:ident $($_await:tt)*) => {{
64        let mut magic_number = [0u8; 4];
65        $reader.read_exact(&mut magic_number)$($_await)*.map_err(|_| Error::OutOfSpec)?;
66
67        // see https://avro.apache.org/docs/current/spec.html#Object+Container+Files
68        if magic_number != [b'O', b'b', b'j', 1u8] {
69            return Err(Error::OutOfSpec);
70        }
71
72        let header = decode::read_header($reader)$($_await)*?;
73
74        let (schema, compression) = deserialize_header(header)?;
75
76        let marker = decode::read_file_marker($reader)$($_await)*?;
77
78        let record = if let Schema::Record(record) = schema {
79            record
80        } else {
81            return Err(Error::OutOfSpec)
82        };
83
84        Ok(FileMetadata {
85            record,
86            compression,
87            marker,
88        })
89    }};
90}
91
92#[allow(unused_imports)]
93pub(crate) use {
94    avro_decode, decode::deserialize_header, decode::DecodeError, read_header, read_metadata_macro,
95};
96
97/// Reads the metadata from `reader` into [`FileMetadata`].
98/// # Error
99/// This function errors iff the header is not a valid avro file header.
100pub fn read_metadata<R: Read>(reader: &mut R) -> Result<FileMetadata, Error> {
101    read_metadata_macro!(reader)
102}
103
104pub use decompress::{block_iterator, BlockStreamingIterator};