1mod block;
3mod decode;
4pub(crate) mod decompress;
5
6use std::io::Read;
7
8use crate::error::Error;
9use crate::file::FileMetadata;
10use crate::schema::Schema;
11
12pub use fallible_streaming_iterator;
13
14macro_rules! avro_decode {
16 ($reader:ident $($_await:tt)*) => {
17 {
18 let mut i = 0u64;
19 let mut buf = [0u8; 1];
20 let mut j = 0;
21 loop {
22 if j > 9 {
23 return Err(DecodeError::OutOfSpec);
25 }
26 $reader.read_exact(&mut buf[..])$($_await)*.map_err(|_| DecodeError::EndOfFile)?;
27 i |= (u64::from(buf[0] & 0x7F)) << (j * 7);
28 if (buf[0] >> 7) == 0 {
29 break;
30 } else {
31 j += 1;
32 }
33 }
34
35 Ok(i)
36 }
37 }
38}
39
40macro_rules! read_header {
41 ($reader:ident $($_await:tt)*) => {{
42 let mut items = HashMap::new();
43
44 loop {
45 let len = zigzag_i64($reader)$($_await)*.map_err(|_| Error::OutOfSpec)? as usize;
46 if len == 0 {
47 break Ok(items);
48 }
49
50 items.reserve(len);
51 for _ in 0..len {
52 let key = _read_binary($reader)$($_await)*?;
53 let key = String::from_utf8(key)
54 .map_err(|_| Error::OutOfSpec)?;
55 let value = _read_binary($reader)$($_await)*?;
56 items.insert(key, value);
57 }
58 }
59 }};
60}
61
62macro_rules! read_metadata_macro {
63 ($reader:ident $($_await:tt)*) => {{
64 let mut magic_number = [0u8; 4];
65 $reader.read_exact(&mut magic_number)$($_await)*.map_err(|_| Error::OutOfSpec)?;
66
67 if magic_number != [b'O', b'b', b'j', 1u8] {
69 return Err(Error::OutOfSpec);
70 }
71
72 let header = decode::read_header($reader)$($_await)*?;
73
74 let (schema, compression) = deserialize_header(header)?;
75
76 let marker = decode::read_file_marker($reader)$($_await)*?;
77
78 let record = if let Schema::Record(record) = schema {
79 record
80 } else {
81 return Err(Error::OutOfSpec)
82 };
83
84 Ok(FileMetadata {
85 record,
86 compression,
87 marker,
88 })
89 }};
90}
91
92#[allow(unused_imports)]
93pub(crate) use {
94 avro_decode, decode::deserialize_header, decode::DecodeError, read_header, read_metadata_macro,
95};
96
97pub fn read_metadata<R: Read>(reader: &mut R) -> Result<FileMetadata, Error> {
101 read_metadata_macro!(reader)
102}
103
104pub use decompress::{block_iterator, BlockStreamingIterator};