apache_avro/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic handling reading from Avro format at user level.
19use crate::{
20    decode::{decode, decode_internal},
21    from_value,
22    rabin::Rabin,
23    schema::{
24        resolve_names, resolve_names_with_schemata, AvroSchema, Names, ResolvedOwnedSchema,
25        ResolvedSchema, Schema,
26    },
27    types::Value,
28    util, AvroResult, Codec, Error,
29};
30use serde::de::DeserializeOwned;
31use serde_json::from_slice;
32use std::{
33    collections::HashMap,
34    io::{ErrorKind, Read},
35    marker::PhantomData,
36    str::FromStr,
37};
38
39/// Internal Block reader.
40#[derive(Debug, Clone)]
41struct Block<'r, R> {
42    reader: R,
43    /// Internal buffering to reduce allocation.
44    buf: Vec<u8>,
45    buf_idx: usize,
46    /// Number of elements expected to exist within this block.
47    message_count: usize,
48    marker: [u8; 16],
49    codec: Codec,
50    writer_schema: Schema,
51    schemata: Vec<&'r Schema>,
52    user_metadata: HashMap<String, Vec<u8>>,
53    names_refs: Names,
54}
55
56impl<'r, R: Read> Block<'r, R> {
57    fn new(reader: R, schemata: Vec<&'r Schema>) -> AvroResult<Block<R>> {
58        let mut block = Block {
59            reader,
60            codec: Codec::Null,
61            writer_schema: Schema::Null,
62            schemata,
63            buf: vec![],
64            buf_idx: 0,
65            message_count: 0,
66            marker: [0; 16],
67            user_metadata: Default::default(),
68            names_refs: Default::default(),
69        };
70
71        block.read_header()?;
72        Ok(block)
73    }
74
75    /// Try to read the header and to set the writer `Schema`, the `Codec` and the marker based on
76    /// its content.
77    fn read_header(&mut self) -> AvroResult<()> {
78        let meta_schema = Schema::map(Schema::Bytes);
79
80        let mut buf = [0u8; 4];
81        self.reader
82            .read_exact(&mut buf)
83            .map_err(Error::ReadHeader)?;
84
85        if buf != [b'O', b'b', b'j', 1u8] {
86            return Err(Error::HeaderMagic);
87        }
88
89        if let Value::Map(metadata) = decode(&meta_schema, &mut self.reader)? {
90            self.read_writer_schema(&metadata)?;
91            self.codec = read_codec(&metadata)?;
92
93            for (key, value) in metadata {
94                if key == "avro.schema" || key == "avro.codec" {
95                    // already processed
96                } else if key.starts_with("avro.") {
97                    warn!("Ignoring unknown metadata key: {}", key);
98                } else {
99                    self.read_user_metadata(key, value);
100                }
101            }
102        } else {
103            return Err(Error::GetHeaderMetadata);
104        }
105
106        self.reader
107            .read_exact(&mut self.marker)
108            .map_err(Error::ReadMarker)
109    }
110
111    fn fill_buf(&mut self, n: usize) -> AvroResult<()> {
112        // The buffer needs to contain exactly `n` elements, otherwise codecs will potentially read
113        // invalid bytes.
114        //
115        // The are two cases to handle here:
116        //
117        // 1. `n > self.buf.len()`:
118        //    In this case we call `Vec::resize`, which guarantees that `self.buf.len() == n`.
119        // 2. `n < self.buf.len()`:
120        //    We need to resize to ensure that the buffer len is safe to read `n` elements.
121        //
122        // TODO: Figure out a way to avoid having to truncate for the second case.
123        self.buf.resize(util::safe_len(n)?, 0);
124        self.reader
125            .read_exact(&mut self.buf)
126            .map_err(Error::ReadIntoBuf)?;
127        self.buf_idx = 0;
128        Ok(())
129    }
130
131    /// Try to read a data block, also performing schema resolution for the objects contained in
132    /// the block. The objects are stored in an internal buffer to the `Reader`.
133    fn read_block_next(&mut self) -> AvroResult<()> {
134        assert!(self.is_empty(), "Expected self to be empty!");
135        match util::read_long(&mut self.reader) {
136            Ok(block_len) => {
137                self.message_count = block_len as usize;
138                let block_bytes = util::read_long(&mut self.reader)?;
139                self.fill_buf(block_bytes as usize)?;
140                let mut marker = [0u8; 16];
141                self.reader
142                    .read_exact(&mut marker)
143                    .map_err(Error::ReadBlockMarker)?;
144
145                if marker != self.marker {
146                    return Err(Error::GetBlockMarker);
147                }
148
149                // NOTE (JAB): This doesn't fit this Reader pattern very well.
150                // `self.buf` is a growable buffer that is reused as the reader is iterated.
151                // For non `Codec::Null` variants, `decompress` will allocate a new `Vec`
152                // and replace `buf` with the new one, instead of reusing the same buffer.
153                // We can address this by using some "limited read" type to decode directly
154                // into the buffer. But this is fine, for now.
155                self.codec.decompress(&mut self.buf)
156            }
157            Err(Error::ReadVariableIntegerBytes(io_err)) => {
158                if let ErrorKind::UnexpectedEof = io_err.kind() {
159                    // to not return any error in case we only finished to read cleanly from the stream
160                    Ok(())
161                } else {
162                    Err(Error::ReadVariableIntegerBytes(io_err))
163                }
164            }
165            Err(e) => Err(e),
166        }
167    }
168
169    fn len(&self) -> usize {
170        self.message_count
171    }
172
173    fn is_empty(&self) -> bool {
174        self.len() == 0
175    }
176
177    fn read_next(&mut self, read_schema: Option<&Schema>) -> AvroResult<Option<Value>> {
178        if self.is_empty() {
179            self.read_block_next()?;
180            if self.is_empty() {
181                return Ok(None);
182            }
183        }
184
185        let mut block_bytes = &self.buf[self.buf_idx..];
186        let b_original = block_bytes.len();
187
188        let item = decode_internal(
189            &self.writer_schema,
190            &self.names_refs,
191            &None,
192            &mut block_bytes,
193        )?;
194        let item = match read_schema {
195            Some(schema) => item.resolve(schema)?,
196            None => item,
197        };
198
199        if b_original == block_bytes.len() {
200            // from_avro_datum did not consume any bytes, so return an error to avoid an infinite loop
201            return Err(Error::ReadBlock);
202        }
203        self.buf_idx += b_original - block_bytes.len();
204        self.message_count -= 1;
205        Ok(Some(item))
206    }
207
208    fn read_writer_schema(&mut self, metadata: &HashMap<String, Value>) -> AvroResult<()> {
209        let json: serde_json::Value = metadata
210            .get("avro.schema")
211            .and_then(|bytes| {
212                if let Value::Bytes(ref bytes) = *bytes {
213                    from_slice(bytes.as_ref()).ok()
214                } else {
215                    None
216                }
217            })
218            .ok_or(Error::GetAvroSchemaFromMap)?;
219        if !self.schemata.is_empty() {
220            let rs = ResolvedSchema::try_from(self.schemata.clone())?;
221            let names: Names = rs
222                .get_names()
223                .iter()
224                .map(|(name, schema)| (name.clone(), (*schema).clone()))
225                .collect();
226            self.writer_schema = Schema::parse_with_names(&json, names)?;
227            resolve_names_with_schemata(&self.schemata, &mut self.names_refs, &None)?;
228        } else {
229            self.writer_schema = Schema::parse(&json)?;
230            resolve_names(&self.writer_schema, &mut self.names_refs, &None)?;
231        }
232        Ok(())
233    }
234
235    fn read_user_metadata(&mut self, key: String, value: Value) {
236        match value {
237            Value::Bytes(ref vec) => {
238                self.user_metadata.insert(key, vec.clone());
239            }
240            wrong => {
241                warn!(
242                    "User metadata values must be Value::Bytes, found {:?}",
243                    wrong
244                );
245            }
246        }
247    }
248}
249
250fn read_codec(metadata: &HashMap<String, Value>) -> AvroResult<Codec> {
251    let result = metadata
252        .get("avro.codec")
253        .map(|codec| {
254            if let Value::Bytes(ref bytes) = *codec {
255                match std::str::from_utf8(bytes.as_ref()) {
256                    Ok(utf8) => Ok(utf8),
257                    Err(utf8_error) => Err(Error::ConvertToUtf8Error(utf8_error)),
258                }
259            } else {
260                Err(Error::BadCodecMetadata)
261            }
262        })
263        .map(|codec_res| match codec_res {
264            Ok(codec) => match Codec::from_str(codec) {
265                Ok(codec) => Ok(codec),
266                Err(_) => Err(Error::CodecNotSupported(codec.to_owned())),
267            },
268            Err(err) => Err(err),
269        });
270
271    match result {
272        Some(res) => res,
273        None => Ok(Codec::Null),
274    }
275}
276
277/// Main interface for reading Avro formatted values.
278///
279/// To be used as an iterator:
280///
281/// ```no_run
282/// # use apache_avro::Reader;
283/// # use std::io::Cursor;
284/// # let input = Cursor::new(Vec::<u8>::new());
285/// for value in Reader::new(input).unwrap() {
286///     match value {
287///         Ok(v) => println!("{:?}", v),
288///         Err(e) => println!("Error: {}", e),
289///     };
290/// }
291/// ```
292pub struct Reader<'a, R> {
293    block: Block<'a, R>,
294    reader_schema: Option<&'a Schema>,
295    errored: bool,
296    should_resolve_schema: bool,
297}
298
299impl<'a, R: Read> Reader<'a, R> {
300    /// Creates a `Reader` given something implementing the `io::Read` trait to read from.
301    /// No reader `Schema` will be set.
302    ///
303    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
304    pub fn new(reader: R) -> AvroResult<Reader<'a, R>> {
305        let block = Block::new(reader, vec![])?;
306        let reader = Reader {
307            block,
308            reader_schema: None,
309            errored: false,
310            should_resolve_schema: false,
311        };
312        Ok(reader)
313    }
314
315    /// Creates a `Reader` given a reader `Schema` and something implementing the `io::Read` trait
316    /// to read from.
317    ///
318    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
319    pub fn with_schema(schema: &'a Schema, reader: R) -> AvroResult<Reader<'a, R>> {
320        let block = Block::new(reader, vec![schema])?;
321        let mut reader = Reader {
322            block,
323            reader_schema: Some(schema),
324            errored: false,
325            should_resolve_schema: false,
326        };
327        // Check if the reader and writer schemas disagree.
328        reader.should_resolve_schema = reader.writer_schema() != schema;
329        Ok(reader)
330    }
331
332    /// Creates a `Reader` given a reader `Schema` and something implementing the `io::Read` trait
333    /// to read from.
334    ///
335    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
336    pub fn with_schemata(
337        schema: &'a Schema,
338        schemata: Vec<&'a Schema>,
339        reader: R,
340    ) -> AvroResult<Reader<'a, R>> {
341        let block = Block::new(reader, schemata)?;
342        let mut reader = Reader {
343            block,
344            reader_schema: Some(schema),
345            errored: false,
346            should_resolve_schema: false,
347        };
348        // Check if the reader and writer schemas disagree.
349        reader.should_resolve_schema = reader.writer_schema() != schema;
350        Ok(reader)
351    }
352
353    /// Get a reference to the writer `Schema`.
354    #[inline]
355    pub fn writer_schema(&self) -> &Schema {
356        &self.block.writer_schema
357    }
358
359    /// Get a reference to the optional reader `Schema`.
360    #[inline]
361    pub fn reader_schema(&self) -> Option<&Schema> {
362        self.reader_schema
363    }
364
365    /// Get a reference to the user metadata
366    #[inline]
367    pub fn user_metadata(&self) -> &HashMap<String, Vec<u8>> {
368        &self.block.user_metadata
369    }
370
371    #[inline]
372    fn read_next(&mut self) -> AvroResult<Option<Value>> {
373        let read_schema = if self.should_resolve_schema {
374            self.reader_schema
375        } else {
376            None
377        };
378
379        self.block.read_next(read_schema)
380    }
381}
382
383impl<'a, R: Read> Iterator for Reader<'a, R> {
384    type Item = AvroResult<Value>;
385
386    fn next(&mut self) -> Option<Self::Item> {
387        // to prevent keep on reading after the first error occurs
388        if self.errored {
389            return None;
390        };
391        match self.read_next() {
392            Ok(opt) => opt.map(Ok),
393            Err(e) => {
394                self.errored = true;
395                Some(Err(e))
396            }
397        }
398    }
399}
400
401/// Decode a `Value` encoded in Avro format given its `Schema` and anything implementing `io::Read`
402/// to read from.
403///
404/// In case a reader `Schema` is provided, schema resolution will also be performed.
405///
406/// **NOTE** This function has a quite small niche of usage and does NOT take care of reading the
407/// header and consecutive data blocks; use [`Reader`](struct.Reader.html) if you don't know what
408/// you are doing, instead.
409pub fn from_avro_datum<R: Read>(
410    writer_schema: &Schema,
411    reader: &mut R,
412    reader_schema: Option<&Schema>,
413) -> AvroResult<Value> {
414    let value = decode(writer_schema, reader)?;
415    match reader_schema {
416        Some(schema) => value.resolve(schema),
417        None => Ok(value),
418    }
419}
420
421/// Decode a `Value` encoded in Avro format given the provided `Schema` and anything implementing `io::Read`
422/// to read from.
423/// If the writer schema is incomplete, i.e. contains `Schema::Ref`s then it will use the provided
424/// schemata to resolve any dependencies.
425///
426/// In case a reader `Schema` is provided, schema resolution will also be performed.
427pub fn from_avro_datum_schemata<R: Read>(
428    writer_schema: &Schema,
429    schemata: Vec<&Schema>,
430    reader: &mut R,
431    reader_schema: Option<&Schema>,
432) -> AvroResult<Value> {
433    let rs = ResolvedSchema::try_from(schemata)?;
434    let value = decode_internal(writer_schema, rs.get_names(), &None, reader)?;
435    match reader_schema {
436        Some(schema) => value.resolve(schema),
437        None => Ok(value),
438    }
439}
440
441pub struct GenericSingleObjectReader {
442    write_schema: ResolvedOwnedSchema,
443    expected_header: [u8; 10],
444}
445
446impl GenericSingleObjectReader {
447    pub fn new(schema: Schema) -> AvroResult<GenericSingleObjectReader> {
448        let fingerprint = schema.fingerprint::<Rabin>();
449        let expected_header = [
450            0xC3,
451            0x01,
452            fingerprint.bytes[0],
453            fingerprint.bytes[1],
454            fingerprint.bytes[2],
455            fingerprint.bytes[3],
456            fingerprint.bytes[4],
457            fingerprint.bytes[5],
458            fingerprint.bytes[6],
459            fingerprint.bytes[7],
460        ];
461        Ok(GenericSingleObjectReader {
462            write_schema: ResolvedOwnedSchema::try_from(schema)?,
463            expected_header,
464        })
465    }
466
467    pub fn read_value<R: Read>(&self, reader: &mut R) -> AvroResult<Value> {
468        let mut header: [u8; 10] = [0; 10];
469        match reader.read_exact(&mut header) {
470            Ok(_) => {
471                if self.expected_header == header {
472                    decode_internal(
473                        self.write_schema.get_root_schema(),
474                        self.write_schema.get_names(),
475                        &None,
476                        reader,
477                    )
478                } else {
479                    Err(Error::SingleObjectHeaderMismatch(
480                        self.expected_header,
481                        header,
482                    ))
483                }
484            }
485            Err(io_error) => Err(Error::ReadHeader(io_error)),
486        }
487    }
488}
489
490pub struct SpecificSingleObjectReader<T>
491where
492    T: AvroSchema,
493{
494    inner: GenericSingleObjectReader,
495    _model: PhantomData<T>,
496}
497
498impl<T> SpecificSingleObjectReader<T>
499where
500    T: AvroSchema,
501{
502    pub fn new() -> AvroResult<SpecificSingleObjectReader<T>> {
503        Ok(SpecificSingleObjectReader {
504            inner: GenericSingleObjectReader::new(T::get_schema())?,
505            _model: PhantomData,
506        })
507    }
508}
509
510impl<T> SpecificSingleObjectReader<T>
511where
512    T: AvroSchema + From<Value>,
513{
514    pub fn read_from_value<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
515        self.inner.read_value(reader).map(|v| v.into())
516    }
517}
518
519impl<T> SpecificSingleObjectReader<T>
520where
521    T: AvroSchema + DeserializeOwned,
522{
523    pub fn read<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
524        from_value::<T>(&self.inner.read_value(reader)?)
525    }
526}
527
528/// Reads the marker bytes from Avro bytes generated earlier by a `Writer`
529pub fn read_marker(bytes: &[u8]) -> [u8; 16] {
530    assert!(
531        bytes.len() > 16,
532        "The bytes are too short to read a marker from them"
533    );
534    let mut marker = [0_u8; 16];
535    marker.clone_from_slice(&bytes[(bytes.len() - 16)..]);
536    marker
537}
538
539#[cfg(test)]
540mod tests {
541    use super::*;
542    use crate::{encode::encode, types::Record};
543    use apache_avro_test_helper::TestResult;
544    use pretty_assertions::assert_eq;
545    use serde::Deserialize;
546    use std::io::Cursor;
547
548    const SCHEMA: &str = r#"
549    {
550      "type": "record",
551      "name": "test",
552      "fields": [
553        {
554          "name": "a",
555          "type": "long",
556          "default": 42
557        },
558        {
559          "name": "b",
560          "type": "string"
561        }
562      ]
563    }
564    "#;
565    const UNION_SCHEMA: &str = r#"["null", "long"]"#;
566    const ENCODED: &[u8] = &[
567        79u8, 98u8, 106u8, 1u8, 4u8, 22u8, 97u8, 118u8, 114u8, 111u8, 46u8, 115u8, 99u8, 104u8,
568        101u8, 109u8, 97u8, 222u8, 1u8, 123u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8,
569        114u8, 101u8, 99u8, 111u8, 114u8, 100u8, 34u8, 44u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8,
570        58u8, 34u8, 116u8, 101u8, 115u8, 116u8, 34u8, 44u8, 34u8, 102u8, 105u8, 101u8, 108u8,
571        100u8, 115u8, 34u8, 58u8, 91u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8,
572        97u8, 34u8, 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 108u8, 111u8, 110u8,
573        103u8, 34u8, 44u8, 34u8, 100u8, 101u8, 102u8, 97u8, 117u8, 108u8, 116u8, 34u8, 58u8, 52u8,
574        50u8, 125u8, 44u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8, 98u8, 34u8,
575        44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 115u8, 116u8, 114u8, 105u8,
576        110u8, 103u8, 34u8, 125u8, 93u8, 125u8, 20u8, 97u8, 118u8, 114u8, 111u8, 46u8, 99u8, 111u8,
577        100u8, 101u8, 99u8, 8u8, 110u8, 117u8, 108u8, 108u8, 0u8, 94u8, 61u8, 54u8, 221u8, 190u8,
578        207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8, 4u8, 20u8, 54u8,
579        6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
580        207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
581    ];
582
583    #[test]
584    fn test_from_avro_datum() -> TestResult {
585        let schema = Schema::parse_str(SCHEMA)?;
586        let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
587
588        let mut record = Record::new(&schema).unwrap();
589        record.put("a", 27i64);
590        record.put("b", "foo");
591        let expected = record.into();
592
593        assert_eq!(from_avro_datum(&schema, &mut encoded, None)?, expected);
594
595        Ok(())
596    }
597
598    #[test]
599    fn test_from_avro_datum_with_union_to_struct() -> TestResult {
600        const TEST_RECORD_SCHEMA_3240: &str = r#"
601    {
602      "type": "record",
603      "name": "test",
604      "fields": [
605        {
606          "name": "a",
607          "type": "long",
608          "default": 42
609        },
610        {
611          "name": "b",
612          "type": "string"
613        },
614        {
615            "name": "a_nullable_array",
616            "type": ["null", {"type": "array", "items": {"type": "string"}}],
617            "default": null
618        },
619        {
620            "name": "a_nullable_boolean",
621            "type": ["null", {"type": "boolean"}],
622            "default": null
623        },
624        {
625            "name": "a_nullable_string",
626            "type": ["null", {"type": "string"}],
627            "default": null
628        }
629      ]
630    }
631    "#;
632        #[derive(Default, Debug, Deserialize, PartialEq, Eq)]
633        struct TestRecord3240 {
634            a: i64,
635            b: String,
636            a_nullable_array: Option<Vec<String>>,
637            // we are missing the 'a_nullable_boolean' field to simulate missing keys
638            // a_nullable_boolean: Option<bool>,
639            a_nullable_string: Option<String>,
640        }
641
642        let schema = Schema::parse_str(TEST_RECORD_SCHEMA_3240)?;
643        let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
644
645        let expected_record: TestRecord3240 = TestRecord3240 {
646            a: 27i64,
647            b: String::from("foo"),
648            a_nullable_array: None,
649            a_nullable_string: None,
650        };
651
652        let avro_datum = from_avro_datum(&schema, &mut encoded, None)?;
653        let parsed_record: TestRecord3240 = match &avro_datum {
654            Value::Record(_) => from_value::<TestRecord3240>(&avro_datum)?,
655            unexpected => {
656                panic!("could not map avro data to struct, found unexpected: {unexpected:?}")
657            }
658        };
659
660        assert_eq!(parsed_record, expected_record);
661
662        Ok(())
663    }
664
665    #[test]
666    fn test_null_union() -> TestResult {
667        let schema = Schema::parse_str(UNION_SCHEMA)?;
668        let mut encoded: &'static [u8] = &[2, 0];
669
670        assert_eq!(
671            from_avro_datum(&schema, &mut encoded, None)?,
672            Value::Union(1, Box::new(Value::Long(0)))
673        );
674
675        Ok(())
676    }
677
678    #[test]
679    fn test_reader_iterator() -> TestResult {
680        let schema = Schema::parse_str(SCHEMA)?;
681        let reader = Reader::with_schema(&schema, ENCODED)?;
682
683        let mut record1 = Record::new(&schema).unwrap();
684        record1.put("a", 27i64);
685        record1.put("b", "foo");
686
687        let mut record2 = Record::new(&schema).unwrap();
688        record2.put("a", 42i64);
689        record2.put("b", "bar");
690
691        let expected = [record1.into(), record2.into()];
692
693        for (i, value) in reader.enumerate() {
694            assert_eq!(value?, expected[i]);
695        }
696
697        Ok(())
698    }
699
700    #[test]
701    fn test_reader_invalid_header() -> TestResult {
702        let schema = Schema::parse_str(SCHEMA)?;
703        let invalid = ENCODED.iter().copied().skip(1).collect::<Vec<u8>>();
704        assert!(Reader::with_schema(&schema, &invalid[..]).is_err());
705
706        Ok(())
707    }
708
709    #[test]
710    fn test_reader_invalid_block() -> TestResult {
711        let schema = Schema::parse_str(SCHEMA)?;
712        let invalid = ENCODED
713            .iter()
714            .copied()
715            .rev()
716            .skip(19)
717            .collect::<Vec<u8>>()
718            .into_iter()
719            .rev()
720            .collect::<Vec<u8>>();
721        let reader = Reader::with_schema(&schema, &invalid[..])?;
722        for value in reader {
723            assert!(value.is_err());
724        }
725
726        Ok(())
727    }
728
729    #[test]
730    fn test_reader_empty_buffer() -> TestResult {
731        let empty = Cursor::new(Vec::new());
732        assert!(Reader::new(empty).is_err());
733
734        Ok(())
735    }
736
737    #[test]
738    fn test_reader_only_header() -> TestResult {
739        let invalid = ENCODED.iter().copied().take(165).collect::<Vec<u8>>();
740        let reader = Reader::new(&invalid[..])?;
741        for value in reader {
742            assert!(value.is_err());
743        }
744
745        Ok(())
746    }
747
748    #[test]
749    fn test_avro_3405_read_user_metadata_success() -> TestResult {
750        use crate::writer::Writer;
751
752        let schema = Schema::parse_str(SCHEMA)?;
753        let mut writer = Writer::new(&schema, Vec::new());
754
755        let mut user_meta_data: HashMap<String, Vec<u8>> = HashMap::new();
756        user_meta_data.insert(
757            "stringKey".to_string(),
758            "stringValue".to_string().into_bytes(),
759        );
760        user_meta_data.insert("bytesKey".to_string(), b"bytesValue".to_vec());
761        user_meta_data.insert("vecKey".to_string(), vec![1, 2, 3]);
762
763        for (k, v) in user_meta_data.iter() {
764            writer.add_user_metadata(k.to_string(), v)?;
765        }
766
767        let mut record = Record::new(&schema).unwrap();
768        record.put("a", 27i64);
769        record.put("b", "foo");
770
771        writer.append(record.clone())?;
772        writer.append(record.clone())?;
773        writer.flush()?;
774        let result = writer.into_inner()?;
775
776        let reader = Reader::new(&result[..])?;
777        assert_eq!(reader.user_metadata(), &user_meta_data);
778
779        Ok(())
780    }
781
782    #[derive(Deserialize, Clone, PartialEq, Debug)]
783    struct TestSingleObjectReader {
784        a: i64,
785        b: f64,
786        c: Vec<String>,
787    }
788
789    impl AvroSchema for TestSingleObjectReader {
790        fn get_schema() -> Schema {
791            let schema = r#"
792            {
793                "type":"record",
794                "name":"TestSingleObjectWrtierSerialize",
795                "fields":[
796                    {
797                        "name":"a",
798                        "type":"long"
799                    },
800                    {
801                        "name":"b",
802                        "type":"double"
803                    },
804                    {
805                        "name":"c",
806                        "type":{
807                            "type":"array",
808                            "items":"string"
809                        }
810                    }
811                ]
812            }
813            "#;
814            Schema::parse_str(schema).unwrap()
815        }
816    }
817
818    impl From<Value> for TestSingleObjectReader {
819        fn from(obj: Value) -> TestSingleObjectReader {
820            if let Value::Record(fields) = obj {
821                let mut a = None;
822                let mut b = None;
823                let mut c = vec![];
824                for (field_name, v) in fields {
825                    match (field_name.as_str(), v) {
826                        ("a", Value::Long(i)) => a = Some(i),
827                        ("b", Value::Double(d)) => b = Some(d),
828                        ("c", Value::Array(v)) => {
829                            for inner_val in v {
830                                if let Value::String(s) = inner_val {
831                                    c.push(s);
832                                }
833                            }
834                        }
835                        (key, value) => panic!("Unexpected pair: {key:?} -> {value:?}"),
836                    }
837                }
838                TestSingleObjectReader {
839                    a: a.unwrap(),
840                    b: b.unwrap(),
841                    c,
842                }
843            } else {
844                panic!("Expected a Value::Record but was {obj:?}")
845            }
846        }
847    }
848
849    impl From<TestSingleObjectReader> for Value {
850        fn from(obj: TestSingleObjectReader) -> Value {
851            Value::Record(vec![
852                ("a".into(), obj.a.into()),
853                ("b".into(), obj.b.into()),
854                (
855                    "c".into(),
856                    Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
857                ),
858            ])
859        }
860    }
861
862    #[test]
863    fn test_avro_3507_single_object_reader() -> TestResult {
864        let obj = TestSingleObjectReader {
865            a: 42,
866            b: 3.33,
867            c: vec!["cat".into(), "dog".into()],
868        };
869        let mut to_read = Vec::<u8>::new();
870        to_read.extend_from_slice(&[0xC3, 0x01]);
871        to_read.extend_from_slice(
872            &TestSingleObjectReader::get_schema()
873                .fingerprint::<Rabin>()
874                .bytes[..],
875        );
876        encode(
877            &obj.clone().into(),
878            &TestSingleObjectReader::get_schema(),
879            &mut to_read,
880        )
881        .expect("Encode should succeed");
882        let mut to_read = &to_read[..];
883        let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
884            .expect("Schema should resolve");
885        let val = generic_reader
886            .read_value(&mut to_read)
887            .expect("Should read");
888        let expected_value: Value = obj.into();
889        assert_eq!(expected_value, val);
890
891        Ok(())
892    }
893
894    #[test]
895    fn avro_3642_test_single_object_reader_incomplete_reads() -> TestResult {
896        let obj = TestSingleObjectReader {
897            a: 42,
898            b: 3.33,
899            c: vec!["cat".into(), "dog".into()],
900        };
901        // The two-byte marker, to show that the message uses this single-record format
902        let to_read_1 = [0xC3, 0x01];
903        let mut to_read_2 = Vec::<u8>::new();
904        to_read_2.extend_from_slice(
905            &TestSingleObjectReader::get_schema()
906                .fingerprint::<Rabin>()
907                .bytes[..],
908        );
909        let mut to_read_3 = Vec::<u8>::new();
910        encode(
911            &obj.clone().into(),
912            &TestSingleObjectReader::get_schema(),
913            &mut to_read_3,
914        )
915        .expect("Encode should succeed");
916        let mut to_read = (&to_read_1[..]).chain(&to_read_2[..]).chain(&to_read_3[..]);
917        let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
918            .expect("Schema should resolve");
919        let val = generic_reader
920            .read_value(&mut to_read)
921            .expect("Should read");
922        let expected_value: Value = obj.into();
923        assert_eq!(expected_value, val);
924
925        Ok(())
926    }
927
928    #[test]
929    fn test_avro_3507_reader_parity() -> TestResult {
930        let obj = TestSingleObjectReader {
931            a: 42,
932            b: 3.33,
933            c: vec!["cat".into(), "dog".into()],
934        };
935
936        let mut to_read = Vec::<u8>::new();
937        to_read.extend_from_slice(&[0xC3, 0x01]);
938        to_read.extend_from_slice(
939            &TestSingleObjectReader::get_schema()
940                .fingerprint::<Rabin>()
941                .bytes[..],
942        );
943        encode(
944            &obj.clone().into(),
945            &TestSingleObjectReader::get_schema(),
946            &mut to_read,
947        )
948        .expect("Encode should succeed");
949        let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
950            .expect("Schema should resolve");
951        let specific_reader = SpecificSingleObjectReader::<TestSingleObjectReader>::new()
952            .expect("schema should resolve");
953        let mut to_read1 = &to_read[..];
954        let mut to_read2 = &to_read[..];
955        let mut to_read3 = &to_read[..];
956
957        let val = generic_reader
958            .read_value(&mut to_read1)
959            .expect("Should read");
960        let read_obj1 = specific_reader
961            .read_from_value(&mut to_read2)
962            .expect("Should read from value");
963        let read_obj2 = specific_reader
964            .read(&mut to_read3)
965            .expect("Should read from deserilize");
966        let expected_value: Value = obj.clone().into();
967        assert_eq!(obj, read_obj1);
968        assert_eq!(obj, read_obj2);
969        assert_eq!(val, expected_value);
970
971        Ok(())
972    }
973
974    #[cfg(not(feature = "snappy"))]
975    #[test]
976    fn test_avro_3549_read_not_enabled_codec() {
977        let snappy_compressed_avro = vec![
978            79, 98, 106, 1, 4, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97, 210, 1, 123,
979            34, 102, 105, 101, 108, 100, 115, 34, 58, 91, 123, 34, 110, 97, 109, 101, 34, 58, 34,
980            110, 117, 109, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110,
981            103, 34, 125, 93, 44, 34, 110, 97, 109, 101, 34, 58, 34, 101, 118, 101, 110, 116, 34,
982            44, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 58, 34, 101, 120, 97, 109, 112,
983            108, 101, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 44, 34, 116, 121, 112, 101, 34,
984            58, 34, 114, 101, 99, 111, 114, 100, 34, 125, 20, 97, 118, 114, 111, 46, 99, 111, 100,
985            101, 99, 12, 115, 110, 97, 112, 112, 121, 0, 213, 209, 241, 208, 200, 110, 164, 47,
986            203, 25, 90, 235, 161, 167, 195, 177, 2, 20, 4, 12, 6, 49, 50, 51, 115, 38, 58, 0, 213,
987            209, 241, 208, 200, 110, 164, 47, 203, 25, 90, 235, 161, 167, 195, 177,
988        ];
989
990        if let Err(err) = Reader::new(snappy_compressed_avro.as_slice()) {
991            assert_eq!("Codec 'snappy' is not supported/enabled", err.to_string());
992        } else {
993            panic!("Expected an error in the reading of the codec!");
994        }
995    }
996}