Skip to main content

avro_rs/
reader.rs

1//! Logic handling reading from Avro format at user level.
2use crate::{decode::decode, schema::Schema, types::Value, util, AvroResult, Codec, Error};
3use serde_json::from_slice;
4use std::{
5    io::{ErrorKind, Read},
6    str::FromStr,
7};
8
9// Internal Block reader.
10#[derive(Debug, Clone)]
11struct Block<R> {
12    reader: R,
13    // Internal buffering to reduce allocation.
14    buf: Vec<u8>,
15    buf_idx: usize,
16    // Number of elements expected to exist within this block.
17    message_count: usize,
18    marker: [u8; 16],
19    codec: Codec,
20    writer_schema: Schema,
21}
22
23impl<R: Read> Block<R> {
24    fn new(reader: R) -> AvroResult<Block<R>> {
25        let mut block = Block {
26            reader,
27            codec: Codec::Null,
28            writer_schema: Schema::Null,
29            buf: vec![],
30            buf_idx: 0,
31            message_count: 0,
32            marker: [0; 16],
33        };
34
35        block.read_header()?;
36        Ok(block)
37    }
38
39    /// Try to read the header and to set the writer `Schema`, the `Codec` and the marker based on
40    /// its content.
41    fn read_header(&mut self) -> AvroResult<()> {
42        let meta_schema = Schema::Map(Box::new(Schema::Bytes));
43
44        let mut buf = [0u8; 4];
45        self.reader
46            .read_exact(&mut buf)
47            .map_err(Error::ReadHeader)?;
48
49        if buf != [b'O', b'b', b'j', 1u8] {
50            return Err(Error::HeaderMagic);
51        }
52
53        if let Value::Map(meta) = decode(&meta_schema, &mut self.reader)? {
54            // TODO: surface original parse schema errors instead of coalescing them here
55            let json = meta
56                .get("avro.schema")
57                .and_then(|bytes| {
58                    if let Value::Bytes(ref bytes) = *bytes {
59                        from_slice(bytes.as_ref()).ok()
60                    } else {
61                        None
62                    }
63                })
64                .ok_or(Error::GetAvroSchemaFromMap)?;
65            self.writer_schema = Schema::parse(&json)?;
66
67            if let Some(codec) = meta
68                .get("avro.codec")
69                .and_then(|codec| {
70                    if let Value::Bytes(ref bytes) = *codec {
71                        std::str::from_utf8(bytes.as_ref()).ok()
72                    } else {
73                        None
74                    }
75                })
76                .and_then(|codec| Codec::from_str(codec).ok())
77            {
78                self.codec = codec;
79            }
80        } else {
81            return Err(Error::GetHeaderMetadata);
82        }
83
84        self.reader
85            .read_exact(&mut self.marker)
86            .map_err(Error::ReadMarker)
87    }
88
89    fn fill_buf(&mut self, n: usize) -> AvroResult<()> {
90        // The buffer needs to contain exactly `n` elements, otherwise codecs will potentially read
91        // invalid bytes.
92        //
93        // The are two cases to handle here:
94        //
95        // 1. `n > self.buf.len()`:
96        //    In this case we call `Vec::resize`, which guarantees that `self.buf.len() == n`.
97        // 2. `n < self.buf.len()`:
98        //    We need to resize to ensure that the buffer len is safe to read `n` elements.
99        //
100        // TODO: Figure out a way to avoid having to truncate for the second case.
101        self.buf.resize(n, 0);
102        self.reader
103            .read_exact(&mut self.buf)
104            .map_err(Error::ReadIntoBuf)?;
105        self.buf_idx = 0;
106        Ok(())
107    }
108
109    /// Try to read a data block, also performing schema resolution for the objects contained in
110    /// the block. The objects are stored in an internal buffer to the `Reader`.
111    fn read_block_next(&mut self) -> AvroResult<()> {
112        assert!(self.is_empty(), "Expected self to be empty!");
113        match util::read_long(&mut self.reader) {
114            Ok(block_len) => {
115                self.message_count = block_len as usize;
116                let block_bytes = util::read_long(&mut self.reader)?;
117                self.fill_buf(block_bytes as usize)?;
118                let mut marker = [0u8; 16];
119                self.reader
120                    .read_exact(&mut marker)
121                    .map_err(Error::ReadBlockMarker)?;
122
123                if marker != self.marker {
124                    return Err(Error::GetBlockMarker);
125                }
126
127                // NOTE (JAB): This doesn't fit this Reader pattern very well.
128                // `self.buf` is a growable buffer that is reused as the reader is iterated.
129                // For non `Codec::Null` variants, `decompress` will allocate a new `Vec`
130                // and replace `buf` with the new one, instead of reusing the same buffer.
131                // We can address this by using some "limited read" type to decode directly
132                // into the buffer. But this is fine, for now.
133                self.codec.decompress(&mut self.buf)
134            }
135            Err(Error::ReadVariableIntegerBytes(io_err)) => {
136                if let ErrorKind::UnexpectedEof = io_err.kind() {
137                    // to not return any error in case we only finished to read cleanly from the stream
138                    Ok(())
139                } else {
140                    Err(Error::ReadVariableIntegerBytes(io_err))
141                }
142            }
143            Err(e) => Err(e),
144        }
145    }
146
147    fn len(&self) -> usize {
148        self.message_count
149    }
150
151    fn is_empty(&self) -> bool {
152        self.len() == 0
153    }
154
155    fn read_next(&mut self, read_schema: Option<&Schema>) -> AvroResult<Option<Value>> {
156        if self.is_empty() {
157            self.read_block_next()?;
158            if self.is_empty() {
159                return Ok(None);
160            }
161        }
162
163        let mut block_bytes = &self.buf[self.buf_idx..];
164        let b_original = block_bytes.len();
165        let item = from_avro_datum(&self.writer_schema, &mut block_bytes, read_schema)?;
166        self.buf_idx += b_original - block_bytes.len();
167        self.message_count -= 1;
168        Ok(Some(item))
169    }
170}
171
172/// Main interface for reading Avro formatted values.
173///
174/// To be used as an iterator:
175///
176/// ```no_run
177/// # use avro_rs::Reader;
178/// # use std::io::Cursor;
179/// # let input = Cursor::new(Vec::<u8>::new());
180/// for value in Reader::new(input).unwrap() {
181///     match value {
182///         Ok(v) => println!("{:?}", v),
183///         Err(e) => println!("Error: {}", e),
184///     };
185/// }
186/// ```
187pub struct Reader<'a, R> {
188    block: Block<R>,
189    reader_schema: Option<&'a Schema>,
190    errored: bool,
191    should_resolve_schema: bool,
192}
193
194impl<'a, R: Read> Reader<'a, R> {
195    /// Creates a `Reader` given something implementing the `io::Read` trait to read from.
196    /// No reader `Schema` will be set.
197    ///
198    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
199    pub fn new(reader: R) -> AvroResult<Reader<'a, R>> {
200        let block = Block::new(reader)?;
201        let reader = Reader {
202            block,
203            reader_schema: None,
204            errored: false,
205            should_resolve_schema: false,
206        };
207        Ok(reader)
208    }
209
210    /// Creates a `Reader` given a reader `Schema` and something implementing the `io::Read` trait
211    /// to read from.
212    ///
213    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
214    pub fn with_schema(schema: &'a Schema, reader: R) -> AvroResult<Reader<'a, R>> {
215        let block = Block::new(reader)?;
216        let mut reader = Reader {
217            block,
218            reader_schema: Some(schema),
219            errored: false,
220            should_resolve_schema: false,
221        };
222        // Check if the reader and writer schemas disagree.
223        reader.should_resolve_schema = reader.writer_schema() != schema;
224        Ok(reader)
225    }
226
227    /// Get a reference to the writer `Schema`.
228    pub fn writer_schema(&self) -> &Schema {
229        &self.block.writer_schema
230    }
231
232    /// Get a reference to the optional reader `Schema`.
233    pub fn reader_schema(&self) -> Option<&Schema> {
234        self.reader_schema
235    }
236
237    #[inline]
238    fn read_next(&mut self) -> AvroResult<Option<Value>> {
239        let read_schema = if self.should_resolve_schema {
240            self.reader_schema
241        } else {
242            None
243        };
244
245        self.block.read_next(read_schema)
246    }
247}
248
249impl<'a, R: Read> Iterator for Reader<'a, R> {
250    type Item = AvroResult<Value>;
251
252    fn next(&mut self) -> Option<Self::Item> {
253        // to prevent keep on reading after the first error occurs
254        if self.errored {
255            return None;
256        };
257        match self.read_next() {
258            Ok(opt) => opt.map(Ok),
259            Err(e) => {
260                self.errored = true;
261                Some(Err(e))
262            }
263        }
264    }
265}
266
267/// Decode a `Value` encoded in Avro format given its `Schema` and anything implementing `io::Read`
268/// to read from.
269///
270/// In case a reader `Schema` is provided, schema resolution will also be performed.
271///
272/// **NOTE** This function has a quite small niche of usage and does NOT take care of reading the
273/// header and consecutive data blocks; use [`Reader`](struct.Reader.html) if you don't know what
274/// you are doing, instead.
275pub fn from_avro_datum<R: Read>(
276    writer_schema: &Schema,
277    reader: &mut R,
278    reader_schema: Option<&Schema>,
279) -> AvroResult<Value> {
280    let value = decode(writer_schema, reader)?;
281    match reader_schema {
282        Some(ref schema) => value.resolve(schema),
283        None => Ok(value),
284    }
285}
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290    use crate::{types::Record, Reader};
291    use std::io::Cursor;
292
293    const SCHEMA: &str = r#"
294    {
295      "type": "record",
296      "name": "test",
297      "fields": [
298        {
299          "name": "a",
300          "type": "long",
301          "default": 42
302        },
303        {
304          "name": "b",
305          "type": "string"
306        }
307      ]
308    }
309    "#;
310    const UNION_SCHEMA: &str = r#"["null", "long"]"#;
311    const ENCODED: &[u8] = &[
312        79u8, 98u8, 106u8, 1u8, 4u8, 22u8, 97u8, 118u8, 114u8, 111u8, 46u8, 115u8, 99u8, 104u8,
313        101u8, 109u8, 97u8, 222u8, 1u8, 123u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8,
314        114u8, 101u8, 99u8, 111u8, 114u8, 100u8, 34u8, 44u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8,
315        58u8, 34u8, 116u8, 101u8, 115u8, 116u8, 34u8, 44u8, 34u8, 102u8, 105u8, 101u8, 108u8,
316        100u8, 115u8, 34u8, 58u8, 91u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8,
317        97u8, 34u8, 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 108u8, 111u8, 110u8,
318        103u8, 34u8, 44u8, 34u8, 100u8, 101u8, 102u8, 97u8, 117u8, 108u8, 116u8, 34u8, 58u8, 52u8,
319        50u8, 125u8, 44u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8, 98u8, 34u8,
320        44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 115u8, 116u8, 114u8, 105u8,
321        110u8, 103u8, 34u8, 125u8, 93u8, 125u8, 20u8, 97u8, 118u8, 114u8, 111u8, 46u8, 99u8, 111u8,
322        100u8, 101u8, 99u8, 8u8, 110u8, 117u8, 108u8, 108u8, 0u8, 94u8, 61u8, 54u8, 221u8, 190u8,
323        207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8, 4u8, 20u8, 54u8,
324        6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
325        207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
326    ];
327
328    #[test]
329    fn test_from_avro_datum() {
330        let schema = Schema::parse_str(SCHEMA).unwrap();
331        let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
332
333        let mut record = Record::new(&schema).unwrap();
334        record.put("a", 27i64);
335        record.put("b", "foo");
336        let expected = record.into();
337
338        assert_eq!(
339            from_avro_datum(&schema, &mut encoded, None).unwrap(),
340            expected
341        );
342    }
343
344    #[test]
345    fn test_null_union() {
346        let schema = Schema::parse_str(UNION_SCHEMA).unwrap();
347        let mut encoded: &'static [u8] = &[2, 0];
348
349        assert_eq!(
350            from_avro_datum(&schema, &mut encoded, None).unwrap(),
351            Value::Union(Box::new(Value::Long(0)))
352        );
353    }
354
355    #[test]
356    fn test_reader_iterator() {
357        let schema = Schema::parse_str(SCHEMA).unwrap();
358        let reader = Reader::with_schema(&schema, ENCODED).unwrap();
359
360        let mut record1 = Record::new(&schema).unwrap();
361        record1.put("a", 27i64);
362        record1.put("b", "foo");
363
364        let mut record2 = Record::new(&schema).unwrap();
365        record2.put("a", 42i64);
366        record2.put("b", "bar");
367
368        let expected = vec![record1.into(), record2.into()];
369
370        for (i, value) in reader.enumerate() {
371            assert_eq!(value.unwrap(), expected[i]);
372        }
373    }
374
375    #[test]
376    fn test_reader_invalid_header() {
377        let schema = Schema::parse_str(SCHEMA).unwrap();
378        let invalid = ENCODED.to_owned().into_iter().skip(1).collect::<Vec<u8>>();
379        assert!(Reader::with_schema(&schema, &invalid[..]).is_err());
380    }
381
382    #[test]
383    fn test_reader_invalid_block() {
384        let schema = Schema::parse_str(SCHEMA).unwrap();
385        let invalid = ENCODED
386            .to_owned()
387            .into_iter()
388            .rev()
389            .skip(19)
390            .collect::<Vec<u8>>()
391            .into_iter()
392            .rev()
393            .collect::<Vec<u8>>();
394        let reader = Reader::with_schema(&schema, &invalid[..]).unwrap();
395        for value in reader {
396            assert!(value.is_err());
397        }
398    }
399
400    #[test]
401    fn test_reader_empty_buffer() {
402        let empty = Cursor::new(Vec::new());
403        assert!(Reader::new(empty).is_err());
404    }
405
406    #[test]
407    fn test_reader_only_header() {
408        let invalid = ENCODED
409            .to_owned()
410            .into_iter()
411            .take(165)
412            .collect::<Vec<u8>>();
413        let reader = Reader::new(&invalid[..]).unwrap();
414        for value in reader {
415            assert!(value.is_err());
416        }
417    }
418}