1use crate::{
20 decode::{decode, decode_internal},
21 from_value,
22 rabin::Rabin,
23 schema::{
24 resolve_names, resolve_names_with_schemata, AvroSchema, Names, ResolvedOwnedSchema,
25 ResolvedSchema, Schema,
26 },
27 types::Value,
28 util, AvroResult, Codec, Error,
29};
30use serde::de::DeserializeOwned;
31use serde_json::from_slice;
32use std::{
33 collections::HashMap,
34 io::{ErrorKind, Read},
35 marker::PhantomData,
36 str::FromStr,
37};
38
39#[derive(Debug, Clone)]
41struct Block<'r, R> {
42 reader: R,
43 buf: Vec<u8>,
45 buf_idx: usize,
46 message_count: usize,
48 marker: [u8; 16],
49 codec: Codec,
50 writer_schema: Schema,
51 schemata: Vec<&'r Schema>,
52 user_metadata: HashMap<String, Vec<u8>>,
53 names_refs: Names,
54}
55
56impl<'r, R: Read> Block<'r, R> {
57 fn new(reader: R, schemata: Vec<&'r Schema>) -> AvroResult<Block<R>> {
58 let mut block = Block {
59 reader,
60 codec: Codec::Null,
61 writer_schema: Schema::Null,
62 schemata,
63 buf: vec![],
64 buf_idx: 0,
65 message_count: 0,
66 marker: [0; 16],
67 user_metadata: Default::default(),
68 names_refs: Default::default(),
69 };
70
71 block.read_header()?;
72 Ok(block)
73 }
74
75 fn read_header(&mut self) -> AvroResult<()> {
78 let meta_schema = Schema::map(Schema::Bytes);
79
80 let mut buf = [0u8; 4];
81 self.reader
82 .read_exact(&mut buf)
83 .map_err(Error::ReadHeader)?;
84
85 if buf != [b'O', b'b', b'j', 1u8] {
86 return Err(Error::HeaderMagic);
87 }
88
89 if let Value::Map(metadata) = decode(&meta_schema, &mut self.reader)? {
90 self.read_writer_schema(&metadata)?;
91 self.codec = read_codec(&metadata)?;
92
93 for (key, value) in metadata {
94 if key == "avro.schema" || key == "avro.codec" {
95 } else if key.starts_with("avro.") {
97 warn!("Ignoring unknown metadata key: {}", key);
98 } else {
99 self.read_user_metadata(key, value);
100 }
101 }
102 } else {
103 return Err(Error::GetHeaderMetadata);
104 }
105
106 self.reader
107 .read_exact(&mut self.marker)
108 .map_err(Error::ReadMarker)
109 }
110
111 fn fill_buf(&mut self, n: usize) -> AvroResult<()> {
112 self.buf.resize(util::safe_len(n)?, 0);
124 self.reader
125 .read_exact(&mut self.buf)
126 .map_err(Error::ReadIntoBuf)?;
127 self.buf_idx = 0;
128 Ok(())
129 }
130
131 fn read_block_next(&mut self) -> AvroResult<()> {
134 assert!(self.is_empty(), "Expected self to be empty!");
135 match util::read_long(&mut self.reader) {
136 Ok(block_len) => {
137 self.message_count = block_len as usize;
138 let block_bytes = util::read_long(&mut self.reader)?;
139 self.fill_buf(block_bytes as usize)?;
140 let mut marker = [0u8; 16];
141 self.reader
142 .read_exact(&mut marker)
143 .map_err(Error::ReadBlockMarker)?;
144
145 if marker != self.marker {
146 return Err(Error::GetBlockMarker);
147 }
148
149 self.codec.decompress(&mut self.buf)
156 }
157 Err(Error::ReadVariableIntegerBytes(io_err)) => {
158 if let ErrorKind::UnexpectedEof = io_err.kind() {
159 Ok(())
161 } else {
162 Err(Error::ReadVariableIntegerBytes(io_err))
163 }
164 }
165 Err(e) => Err(e),
166 }
167 }
168
169 fn len(&self) -> usize {
170 self.message_count
171 }
172
173 fn is_empty(&self) -> bool {
174 self.len() == 0
175 }
176
177 fn read_next(&mut self, read_schema: Option<&Schema>) -> AvroResult<Option<Value>> {
178 if self.is_empty() {
179 self.read_block_next()?;
180 if self.is_empty() {
181 return Ok(None);
182 }
183 }
184
185 let mut block_bytes = &self.buf[self.buf_idx..];
186 let b_original = block_bytes.len();
187
188 let item = decode_internal(
189 &self.writer_schema,
190 &self.names_refs,
191 &None,
192 &mut block_bytes,
193 )?;
194 let item = match read_schema {
195 Some(schema) => item.resolve(schema)?,
196 None => item,
197 };
198
199 if b_original == block_bytes.len() {
200 return Err(Error::ReadBlock);
202 }
203 self.buf_idx += b_original - block_bytes.len();
204 self.message_count -= 1;
205 Ok(Some(item))
206 }
207
208 fn read_writer_schema(&mut self, metadata: &HashMap<String, Value>) -> AvroResult<()> {
209 let json: serde_json::Value = metadata
210 .get("avro.schema")
211 .and_then(|bytes| {
212 if let Value::Bytes(ref bytes) = *bytes {
213 from_slice(bytes.as_ref()).ok()
214 } else {
215 None
216 }
217 })
218 .ok_or(Error::GetAvroSchemaFromMap)?;
219 if !self.schemata.is_empty() {
220 let rs = ResolvedSchema::try_from(self.schemata.clone())?;
221 let names: Names = rs
222 .get_names()
223 .iter()
224 .map(|(name, schema)| (name.clone(), (*schema).clone()))
225 .collect();
226 self.writer_schema = Schema::parse_with_names(&json, names)?;
227 resolve_names_with_schemata(&self.schemata, &mut self.names_refs, &None)?;
228 } else {
229 self.writer_schema = Schema::parse(&json)?;
230 resolve_names(&self.writer_schema, &mut self.names_refs, &None)?;
231 }
232 Ok(())
233 }
234
235 fn read_user_metadata(&mut self, key: String, value: Value) {
236 match value {
237 Value::Bytes(ref vec) => {
238 self.user_metadata.insert(key, vec.clone());
239 }
240 wrong => {
241 warn!(
242 "User metadata values must be Value::Bytes, found {:?}",
243 wrong
244 );
245 }
246 }
247 }
248}
249
250fn read_codec(metadata: &HashMap<String, Value>) -> AvroResult<Codec> {
251 let result = metadata
252 .get("avro.codec")
253 .map(|codec| {
254 if let Value::Bytes(ref bytes) = *codec {
255 match std::str::from_utf8(bytes.as_ref()) {
256 Ok(utf8) => Ok(utf8),
257 Err(utf8_error) => Err(Error::ConvertToUtf8Error(utf8_error)),
258 }
259 } else {
260 Err(Error::BadCodecMetadata)
261 }
262 })
263 .map(|codec_res| match codec_res {
264 Ok(codec) => match Codec::from_str(codec) {
265 Ok(codec) => Ok(codec),
266 Err(_) => Err(Error::CodecNotSupported(codec.to_owned())),
267 },
268 Err(err) => Err(err),
269 });
270
271 match result {
272 Some(res) => res,
273 None => Ok(Codec::Null),
274 }
275}
276
277pub struct Reader<'a, R> {
293 block: Block<'a, R>,
294 reader_schema: Option<&'a Schema>,
295 errored: bool,
296 should_resolve_schema: bool,
297}
298
299impl<'a, R: Read> Reader<'a, R> {
300 pub fn new(reader: R) -> AvroResult<Reader<'a, R>> {
305 let block = Block::new(reader, vec![])?;
306 let reader = Reader {
307 block,
308 reader_schema: None,
309 errored: false,
310 should_resolve_schema: false,
311 };
312 Ok(reader)
313 }
314
315 pub fn with_schema(schema: &'a Schema, reader: R) -> AvroResult<Reader<'a, R>> {
320 let block = Block::new(reader, vec![schema])?;
321 let mut reader = Reader {
322 block,
323 reader_schema: Some(schema),
324 errored: false,
325 should_resolve_schema: false,
326 };
327 reader.should_resolve_schema = reader.writer_schema() != schema;
329 Ok(reader)
330 }
331
332 pub fn with_schemata(
337 schema: &'a Schema,
338 schemata: Vec<&'a Schema>,
339 reader: R,
340 ) -> AvroResult<Reader<'a, R>> {
341 let block = Block::new(reader, schemata)?;
342 let mut reader = Reader {
343 block,
344 reader_schema: Some(schema),
345 errored: false,
346 should_resolve_schema: false,
347 };
348 reader.should_resolve_schema = reader.writer_schema() != schema;
350 Ok(reader)
351 }
352
353 #[inline]
355 pub fn writer_schema(&self) -> &Schema {
356 &self.block.writer_schema
357 }
358
359 #[inline]
361 pub fn reader_schema(&self) -> Option<&Schema> {
362 self.reader_schema
363 }
364
365 #[inline]
367 pub fn user_metadata(&self) -> &HashMap<String, Vec<u8>> {
368 &self.block.user_metadata
369 }
370
371 #[inline]
372 fn read_next(&mut self) -> AvroResult<Option<Value>> {
373 let read_schema = if self.should_resolve_schema {
374 self.reader_schema
375 } else {
376 None
377 };
378
379 self.block.read_next(read_schema)
380 }
381}
382
383impl<'a, R: Read> Iterator for Reader<'a, R> {
384 type Item = AvroResult<Value>;
385
386 fn next(&mut self) -> Option<Self::Item> {
387 if self.errored {
389 return None;
390 };
391 match self.read_next() {
392 Ok(opt) => opt.map(Ok),
393 Err(e) => {
394 self.errored = true;
395 Some(Err(e))
396 }
397 }
398 }
399}
400
401pub fn from_avro_datum<R: Read>(
410 writer_schema: &Schema,
411 reader: &mut R,
412 reader_schema: Option<&Schema>,
413) -> AvroResult<Value> {
414 let value = decode(writer_schema, reader)?;
415 match reader_schema {
416 Some(schema) => value.resolve(schema),
417 None => Ok(value),
418 }
419}
420
421pub fn from_avro_datum_schemata<R: Read>(
428 writer_schema: &Schema,
429 schemata: Vec<&Schema>,
430 reader: &mut R,
431 reader_schema: Option<&Schema>,
432) -> AvroResult<Value> {
433 let rs = ResolvedSchema::try_from(schemata)?;
434 let value = decode_internal(writer_schema, rs.get_names(), &None, reader)?;
435 match reader_schema {
436 Some(schema) => value.resolve(schema),
437 None => Ok(value),
438 }
439}
440
441pub struct GenericSingleObjectReader {
442 write_schema: ResolvedOwnedSchema,
443 expected_header: [u8; 10],
444}
445
446impl GenericSingleObjectReader {
447 pub fn new(schema: Schema) -> AvroResult<GenericSingleObjectReader> {
448 let fingerprint = schema.fingerprint::<Rabin>();
449 let expected_header = [
450 0xC3,
451 0x01,
452 fingerprint.bytes[0],
453 fingerprint.bytes[1],
454 fingerprint.bytes[2],
455 fingerprint.bytes[3],
456 fingerprint.bytes[4],
457 fingerprint.bytes[5],
458 fingerprint.bytes[6],
459 fingerprint.bytes[7],
460 ];
461 Ok(GenericSingleObjectReader {
462 write_schema: ResolvedOwnedSchema::try_from(schema)?,
463 expected_header,
464 })
465 }
466
467 pub fn read_value<R: Read>(&self, reader: &mut R) -> AvroResult<Value> {
468 let mut header: [u8; 10] = [0; 10];
469 match reader.read_exact(&mut header) {
470 Ok(_) => {
471 if self.expected_header == header {
472 decode_internal(
473 self.write_schema.get_root_schema(),
474 self.write_schema.get_names(),
475 &None,
476 reader,
477 )
478 } else {
479 Err(Error::SingleObjectHeaderMismatch(
480 self.expected_header,
481 header,
482 ))
483 }
484 }
485 Err(io_error) => Err(Error::ReadHeader(io_error)),
486 }
487 }
488}
489
490pub struct SpecificSingleObjectReader<T>
491where
492 T: AvroSchema,
493{
494 inner: GenericSingleObjectReader,
495 _model: PhantomData<T>,
496}
497
498impl<T> SpecificSingleObjectReader<T>
499where
500 T: AvroSchema,
501{
502 pub fn new() -> AvroResult<SpecificSingleObjectReader<T>> {
503 Ok(SpecificSingleObjectReader {
504 inner: GenericSingleObjectReader::new(T::get_schema())?,
505 _model: PhantomData,
506 })
507 }
508}
509
510impl<T> SpecificSingleObjectReader<T>
511where
512 T: AvroSchema + From<Value>,
513{
514 pub fn read_from_value<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
515 self.inner.read_value(reader).map(|v| v.into())
516 }
517}
518
519impl<T> SpecificSingleObjectReader<T>
520where
521 T: AvroSchema + DeserializeOwned,
522{
523 pub fn read<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
524 from_value::<T>(&self.inner.read_value(reader)?)
525 }
526}
527
528pub fn read_marker(bytes: &[u8]) -> [u8; 16] {
530 assert!(
531 bytes.len() > 16,
532 "The bytes are too short to read a marker from them"
533 );
534 let mut marker = [0_u8; 16];
535 marker.clone_from_slice(&bytes[(bytes.len() - 16)..]);
536 marker
537}
538
539#[cfg(test)]
540mod tests {
541 use super::*;
542 use crate::{encode::encode, types::Record};
543 use apache_avro_test_helper::TestResult;
544 use pretty_assertions::assert_eq;
545 use serde::Deserialize;
546 use std::io::Cursor;
547
548 const SCHEMA: &str = r#"
549 {
550 "type": "record",
551 "name": "test",
552 "fields": [
553 {
554 "name": "a",
555 "type": "long",
556 "default": 42
557 },
558 {
559 "name": "b",
560 "type": "string"
561 }
562 ]
563 }
564 "#;
565 const UNION_SCHEMA: &str = r#"["null", "long"]"#;
566 const ENCODED: &[u8] = &[
567 79u8, 98u8, 106u8, 1u8, 4u8, 22u8, 97u8, 118u8, 114u8, 111u8, 46u8, 115u8, 99u8, 104u8,
568 101u8, 109u8, 97u8, 222u8, 1u8, 123u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8,
569 114u8, 101u8, 99u8, 111u8, 114u8, 100u8, 34u8, 44u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8,
570 58u8, 34u8, 116u8, 101u8, 115u8, 116u8, 34u8, 44u8, 34u8, 102u8, 105u8, 101u8, 108u8,
571 100u8, 115u8, 34u8, 58u8, 91u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8,
572 97u8, 34u8, 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 108u8, 111u8, 110u8,
573 103u8, 34u8, 44u8, 34u8, 100u8, 101u8, 102u8, 97u8, 117u8, 108u8, 116u8, 34u8, 58u8, 52u8,
574 50u8, 125u8, 44u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8, 98u8, 34u8,
575 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 115u8, 116u8, 114u8, 105u8,
576 110u8, 103u8, 34u8, 125u8, 93u8, 125u8, 20u8, 97u8, 118u8, 114u8, 111u8, 46u8, 99u8, 111u8,
577 100u8, 101u8, 99u8, 8u8, 110u8, 117u8, 108u8, 108u8, 0u8, 94u8, 61u8, 54u8, 221u8, 190u8,
578 207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8, 4u8, 20u8, 54u8,
579 6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
580 207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
581 ];
582
583 #[test]
584 fn test_from_avro_datum() -> TestResult {
585 let schema = Schema::parse_str(SCHEMA)?;
586 let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
587
588 let mut record = Record::new(&schema).unwrap();
589 record.put("a", 27i64);
590 record.put("b", "foo");
591 let expected = record.into();
592
593 assert_eq!(from_avro_datum(&schema, &mut encoded, None)?, expected);
594
595 Ok(())
596 }
597
598 #[test]
599 fn test_from_avro_datum_with_union_to_struct() -> TestResult {
600 const TEST_RECORD_SCHEMA_3240: &str = r#"
601 {
602 "type": "record",
603 "name": "test",
604 "fields": [
605 {
606 "name": "a",
607 "type": "long",
608 "default": 42
609 },
610 {
611 "name": "b",
612 "type": "string"
613 },
614 {
615 "name": "a_nullable_array",
616 "type": ["null", {"type": "array", "items": {"type": "string"}}],
617 "default": null
618 },
619 {
620 "name": "a_nullable_boolean",
621 "type": ["null", {"type": "boolean"}],
622 "default": null
623 },
624 {
625 "name": "a_nullable_string",
626 "type": ["null", {"type": "string"}],
627 "default": null
628 }
629 ]
630 }
631 "#;
632 #[derive(Default, Debug, Deserialize, PartialEq, Eq)]
633 struct TestRecord3240 {
634 a: i64,
635 b: String,
636 a_nullable_array: Option<Vec<String>>,
637 a_nullable_string: Option<String>,
640 }
641
642 let schema = Schema::parse_str(TEST_RECORD_SCHEMA_3240)?;
643 let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
644
645 let expected_record: TestRecord3240 = TestRecord3240 {
646 a: 27i64,
647 b: String::from("foo"),
648 a_nullable_array: None,
649 a_nullable_string: None,
650 };
651
652 let avro_datum = from_avro_datum(&schema, &mut encoded, None)?;
653 let parsed_record: TestRecord3240 = match &avro_datum {
654 Value::Record(_) => from_value::<TestRecord3240>(&avro_datum)?,
655 unexpected => {
656 panic!("could not map avro data to struct, found unexpected: {unexpected:?}")
657 }
658 };
659
660 assert_eq!(parsed_record, expected_record);
661
662 Ok(())
663 }
664
665 #[test]
666 fn test_null_union() -> TestResult {
667 let schema = Schema::parse_str(UNION_SCHEMA)?;
668 let mut encoded: &'static [u8] = &[2, 0];
669
670 assert_eq!(
671 from_avro_datum(&schema, &mut encoded, None)?,
672 Value::Union(1, Box::new(Value::Long(0)))
673 );
674
675 Ok(())
676 }
677
678 #[test]
679 fn test_reader_iterator() -> TestResult {
680 let schema = Schema::parse_str(SCHEMA)?;
681 let reader = Reader::with_schema(&schema, ENCODED)?;
682
683 let mut record1 = Record::new(&schema).unwrap();
684 record1.put("a", 27i64);
685 record1.put("b", "foo");
686
687 let mut record2 = Record::new(&schema).unwrap();
688 record2.put("a", 42i64);
689 record2.put("b", "bar");
690
691 let expected = [record1.into(), record2.into()];
692
693 for (i, value) in reader.enumerate() {
694 assert_eq!(value?, expected[i]);
695 }
696
697 Ok(())
698 }
699
700 #[test]
701 fn test_reader_invalid_header() -> TestResult {
702 let schema = Schema::parse_str(SCHEMA)?;
703 let invalid = ENCODED.iter().copied().skip(1).collect::<Vec<u8>>();
704 assert!(Reader::with_schema(&schema, &invalid[..]).is_err());
705
706 Ok(())
707 }
708
709 #[test]
710 fn test_reader_invalid_block() -> TestResult {
711 let schema = Schema::parse_str(SCHEMA)?;
712 let invalid = ENCODED
713 .iter()
714 .copied()
715 .rev()
716 .skip(19)
717 .collect::<Vec<u8>>()
718 .into_iter()
719 .rev()
720 .collect::<Vec<u8>>();
721 let reader = Reader::with_schema(&schema, &invalid[..])?;
722 for value in reader {
723 assert!(value.is_err());
724 }
725
726 Ok(())
727 }
728
729 #[test]
730 fn test_reader_empty_buffer() -> TestResult {
731 let empty = Cursor::new(Vec::new());
732 assert!(Reader::new(empty).is_err());
733
734 Ok(())
735 }
736
737 #[test]
738 fn test_reader_only_header() -> TestResult {
739 let invalid = ENCODED.iter().copied().take(165).collect::<Vec<u8>>();
740 let reader = Reader::new(&invalid[..])?;
741 for value in reader {
742 assert!(value.is_err());
743 }
744
745 Ok(())
746 }
747
748 #[test]
749 fn test_avro_3405_read_user_metadata_success() -> TestResult {
750 use crate::writer::Writer;
751
752 let schema = Schema::parse_str(SCHEMA)?;
753 let mut writer = Writer::new(&schema, Vec::new());
754
755 let mut user_meta_data: HashMap<String, Vec<u8>> = HashMap::new();
756 user_meta_data.insert(
757 "stringKey".to_string(),
758 "stringValue".to_string().into_bytes(),
759 );
760 user_meta_data.insert("bytesKey".to_string(), b"bytesValue".to_vec());
761 user_meta_data.insert("vecKey".to_string(), vec![1, 2, 3]);
762
763 for (k, v) in user_meta_data.iter() {
764 writer.add_user_metadata(k.to_string(), v)?;
765 }
766
767 let mut record = Record::new(&schema).unwrap();
768 record.put("a", 27i64);
769 record.put("b", "foo");
770
771 writer.append(record.clone())?;
772 writer.append(record.clone())?;
773 writer.flush()?;
774 let result = writer.into_inner()?;
775
776 let reader = Reader::new(&result[..])?;
777 assert_eq!(reader.user_metadata(), &user_meta_data);
778
779 Ok(())
780 }
781
782 #[derive(Deserialize, Clone, PartialEq, Debug)]
783 struct TestSingleObjectReader {
784 a: i64,
785 b: f64,
786 c: Vec<String>,
787 }
788
789 impl AvroSchema for TestSingleObjectReader {
790 fn get_schema() -> Schema {
791 let schema = r#"
792 {
793 "type":"record",
794 "name":"TestSingleObjectWrtierSerialize",
795 "fields":[
796 {
797 "name":"a",
798 "type":"long"
799 },
800 {
801 "name":"b",
802 "type":"double"
803 },
804 {
805 "name":"c",
806 "type":{
807 "type":"array",
808 "items":"string"
809 }
810 }
811 ]
812 }
813 "#;
814 Schema::parse_str(schema).unwrap()
815 }
816 }
817
818 impl From<Value> for TestSingleObjectReader {
819 fn from(obj: Value) -> TestSingleObjectReader {
820 if let Value::Record(fields) = obj {
821 let mut a = None;
822 let mut b = None;
823 let mut c = vec![];
824 for (field_name, v) in fields {
825 match (field_name.as_str(), v) {
826 ("a", Value::Long(i)) => a = Some(i),
827 ("b", Value::Double(d)) => b = Some(d),
828 ("c", Value::Array(v)) => {
829 for inner_val in v {
830 if let Value::String(s) = inner_val {
831 c.push(s);
832 }
833 }
834 }
835 (key, value) => panic!("Unexpected pair: {key:?} -> {value:?}"),
836 }
837 }
838 TestSingleObjectReader {
839 a: a.unwrap(),
840 b: b.unwrap(),
841 c,
842 }
843 } else {
844 panic!("Expected a Value::Record but was {obj:?}")
845 }
846 }
847 }
848
849 impl From<TestSingleObjectReader> for Value {
850 fn from(obj: TestSingleObjectReader) -> Value {
851 Value::Record(vec![
852 ("a".into(), obj.a.into()),
853 ("b".into(), obj.b.into()),
854 (
855 "c".into(),
856 Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
857 ),
858 ])
859 }
860 }
861
862 #[test]
863 fn test_avro_3507_single_object_reader() -> TestResult {
864 let obj = TestSingleObjectReader {
865 a: 42,
866 b: 3.33,
867 c: vec!["cat".into(), "dog".into()],
868 };
869 let mut to_read = Vec::<u8>::new();
870 to_read.extend_from_slice(&[0xC3, 0x01]);
871 to_read.extend_from_slice(
872 &TestSingleObjectReader::get_schema()
873 .fingerprint::<Rabin>()
874 .bytes[..],
875 );
876 encode(
877 &obj.clone().into(),
878 &TestSingleObjectReader::get_schema(),
879 &mut to_read,
880 )
881 .expect("Encode should succeed");
882 let mut to_read = &to_read[..];
883 let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
884 .expect("Schema should resolve");
885 let val = generic_reader
886 .read_value(&mut to_read)
887 .expect("Should read");
888 let expected_value: Value = obj.into();
889 assert_eq!(expected_value, val);
890
891 Ok(())
892 }
893
894 #[test]
895 fn avro_3642_test_single_object_reader_incomplete_reads() -> TestResult {
896 let obj = TestSingleObjectReader {
897 a: 42,
898 b: 3.33,
899 c: vec!["cat".into(), "dog".into()],
900 };
901 let to_read_1 = [0xC3, 0x01];
903 let mut to_read_2 = Vec::<u8>::new();
904 to_read_2.extend_from_slice(
905 &TestSingleObjectReader::get_schema()
906 .fingerprint::<Rabin>()
907 .bytes[..],
908 );
909 let mut to_read_3 = Vec::<u8>::new();
910 encode(
911 &obj.clone().into(),
912 &TestSingleObjectReader::get_schema(),
913 &mut to_read_3,
914 )
915 .expect("Encode should succeed");
916 let mut to_read = (&to_read_1[..]).chain(&to_read_2[..]).chain(&to_read_3[..]);
917 let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
918 .expect("Schema should resolve");
919 let val = generic_reader
920 .read_value(&mut to_read)
921 .expect("Should read");
922 let expected_value: Value = obj.into();
923 assert_eq!(expected_value, val);
924
925 Ok(())
926 }
927
928 #[test]
929 fn test_avro_3507_reader_parity() -> TestResult {
930 let obj = TestSingleObjectReader {
931 a: 42,
932 b: 3.33,
933 c: vec!["cat".into(), "dog".into()],
934 };
935
936 let mut to_read = Vec::<u8>::new();
937 to_read.extend_from_slice(&[0xC3, 0x01]);
938 to_read.extend_from_slice(
939 &TestSingleObjectReader::get_schema()
940 .fingerprint::<Rabin>()
941 .bytes[..],
942 );
943 encode(
944 &obj.clone().into(),
945 &TestSingleObjectReader::get_schema(),
946 &mut to_read,
947 )
948 .expect("Encode should succeed");
949 let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
950 .expect("Schema should resolve");
951 let specific_reader = SpecificSingleObjectReader::<TestSingleObjectReader>::new()
952 .expect("schema should resolve");
953 let mut to_read1 = &to_read[..];
954 let mut to_read2 = &to_read[..];
955 let mut to_read3 = &to_read[..];
956
957 let val = generic_reader
958 .read_value(&mut to_read1)
959 .expect("Should read");
960 let read_obj1 = specific_reader
961 .read_from_value(&mut to_read2)
962 .expect("Should read from value");
963 let read_obj2 = specific_reader
964 .read(&mut to_read3)
965 .expect("Should read from deserilize");
966 let expected_value: Value = obj.clone().into();
967 assert_eq!(obj, read_obj1);
968 assert_eq!(obj, read_obj2);
969 assert_eq!(val, expected_value);
970
971 Ok(())
972 }
973
974 #[cfg(not(feature = "snappy"))]
975 #[test]
976 fn test_avro_3549_read_not_enabled_codec() {
977 let snappy_compressed_avro = vec![
978 79, 98, 106, 1, 4, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97, 210, 1, 123,
979 34, 102, 105, 101, 108, 100, 115, 34, 58, 91, 123, 34, 110, 97, 109, 101, 34, 58, 34,
980 110, 117, 109, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110,
981 103, 34, 125, 93, 44, 34, 110, 97, 109, 101, 34, 58, 34, 101, 118, 101, 110, 116, 34,
982 44, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 58, 34, 101, 120, 97, 109, 112,
983 108, 101, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 44, 34, 116, 121, 112, 101, 34,
984 58, 34, 114, 101, 99, 111, 114, 100, 34, 125, 20, 97, 118, 114, 111, 46, 99, 111, 100,
985 101, 99, 12, 115, 110, 97, 112, 112, 121, 0, 213, 209, 241, 208, 200, 110, 164, 47,
986 203, 25, 90, 235, 161, 167, 195, 177, 2, 20, 4, 12, 6, 49, 50, 51, 115, 38, 58, 0, 213,
987 209, 241, 208, 200, 110, 164, 47, 203, 25, 90, 235, 161, 167, 195, 177,
988 ];
989
990 if let Err(err) = Reader::new(snappy_compressed_avro.as_slice()) {
991 assert_eq!("Codec 'snappy' is not supported/enabled", err.to_string());
992 } else {
993 panic!("Expected an error in the reading of the codec!");
994 }
995 }
996}