1use std::str::{from_utf8, FromStr};
27
28use serde_json::from_slice;
29
30use crate::decode::{decode, AvroRead};
31use crate::error::{DecodeError, Error as AvroError};
32use crate::schema::{
33 resolve_schemas, FullName, NamedSchemaPiece, ParseSchemaError, RecordField,
34 ResolvedDefaultValueField, SchemaNodeOrNamed, SchemaPiece, SchemaPieceOrNamed,
35 SchemaPieceRefOrNamed,
36};
37use crate::schema::{ResolvedRecordField, Schema};
38use crate::types::{AvroMap, Value};
39use crate::util::{self};
40use crate::{Codec, SchemaResolutionError};
41
42use sha2::Sha256;
43use std::collections::HashMap;
44
45#[derive(Debug, Clone)]
46pub(crate) struct Header {
47 writer_schema: Schema,
48 marker: [u8; 16],
49 codec: Codec,
50}
51
52impl Header {
53 pub fn from_reader<R: AvroRead>(reader: &mut R) -> Result<Header, AvroError> {
54 let meta_schema = Schema {
55 named: vec![],
56 indices: Default::default(),
57 top: SchemaPiece::Map(Box::new(SchemaPiece::Bytes.into())).into(),
58 };
59
60 let mut buf = [0u8; 4];
61 reader.read_exact(&mut buf)?;
62
63 if buf != [b'O', b'b', b'j', 1u8] {
64 return Err(AvroError::Decode(DecodeError::WrongHeaderMagic(buf)));
65 }
66
67 if let Value::Map(AvroMap(meta)) = decode(meta_schema.top_node(), reader)? {
68 let json = meta
70 .get("avro.schema")
71 .ok_or(AvroError::Decode(DecodeError::MissingAvroDotSchema))
72 .and_then(|bytes| {
73 if let Value::Bytes(ref bytes) = *bytes {
74 from_slice(bytes.as_ref()).map_err(|e| {
75 AvroError::ParseSchema(ParseSchemaError::new(format!(
76 "unable to decode schema bytes: {}",
77 e
78 )))
79 })
80 } else {
81 unreachable!()
82 }
83 })?;
84 let writer_schema = Schema::parse(&json).map_err(|e| {
85 ParseSchemaError::new(format!("unable to parse json as avro schema: {}", e))
86 })?;
87
88 let codec = meta
89 .get("avro.codec")
90 .map(|val| match val {
91 Value::Bytes(ref bytes) => from_utf8(bytes.as_ref())
92 .map_err(|_e| AvroError::Decode(DecodeError::CodecUtf8Error))
93 .and_then(|codec| {
94 Codec::from_str(codec).map_err(|_| {
95 AvroError::Decode(DecodeError::UnrecognizedCodec(codec.to_string()))
96 })
97 }),
98 _ => unreachable!(),
99 })
100 .unwrap_or(Ok(Codec::Null))?;
101
102 let mut marker = [0u8; 16];
103 reader.read_exact(&mut marker)?;
104
105 Ok(Header {
106 writer_schema,
107 marker,
108 codec,
109 })
110 } else {
111 unreachable!()
112 }
113 }
114
115 pub fn into_parts(self) -> (Schema, [u8; 16], Codec) {
116 (self.writer_schema, self.marker, self.codec)
117 }
118}
119
120pub struct Reader<R> {
121 header: Header,
122 inner: R,
123 errored: bool,
124 resolved_schema: Option<Schema>,
125 messages_remaining: usize,
126 buf: Vec<u8>,
128 buf_idx: usize,
129}
130
131pub struct BlockIter<R> {
133 inner: Reader<R>,
134}
135
136#[derive(Debug, Clone)]
138pub struct Block {
139 pub bytes: Vec<u8>,
141 pub len: usize,
143}
144
145impl<R: AvroRead> BlockIter<R> {
146 pub fn with_schema(reader_schema: &Schema, inner: R) -> Result<Self, AvroError> {
147 Ok(Self {
148 inner: Reader::with_schema(reader_schema, inner)?,
149 })
150 }
151}
152
153impl<R: AvroRead> Iterator for BlockIter<R> {
154 type Item = Result<Block, AvroError>;
155
156 fn next(&mut self) -> Option<Self::Item> {
157 assert!(self.inner.is_empty());
158
159 match self.inner.read_block_next() {
160 Ok(()) => {
161 if self.inner.is_empty() {
162 None
163 } else {
164 let bytes = std::mem::take(&mut self.inner.buf);
165 let len = std::mem::take(&mut self.inner.messages_remaining);
166 Some(Ok(Block { bytes, len }))
167 }
168 }
169 Err(e) => Some(Err(e)),
170 }
171 }
172}
173
174impl<R: AvroRead> Reader<R> {
175 pub fn new(mut inner: R) -> Result<Reader<R>, AvroError> {
180 let header = Header::from_reader(&mut inner)?;
181 let reader = Reader {
182 header,
183 inner,
184 errored: false,
185 resolved_schema: None,
186 messages_remaining: 0,
187 buf: vec![],
188 buf_idx: 0,
189 };
190 Ok(reader)
191 }
192
193 pub fn with_schema(reader_schema: &Schema, mut inner: R) -> Result<Reader<R>, AvroError> {
198 let header = Header::from_reader(&mut inner)?;
199
200 let writer_schema = &header.writer_schema;
201 let resolved_schema = if reader_schema.fingerprint::<Sha256>().bytes
202 != writer_schema.fingerprint::<Sha256>().bytes
203 {
204 Some(resolve_schemas(&writer_schema, reader_schema)?)
205 } else {
206 None
207 };
208
209 Ok(Reader {
210 header,
211 errored: false,
212 resolved_schema,
213 inner,
214 messages_remaining: 0,
215 buf: vec![],
216 buf_idx: 0,
217 })
218 }
219
220 pub fn writer_schema(&self) -> &Schema {
222 &self.header.writer_schema
223 }
224
225 pub fn schema(&self) -> &Schema {
229 match &self.resolved_schema {
230 Some(schema) => schema,
231 None => self.writer_schema(),
232 }
233 }
234
235 #[inline]
236 pub fn read_next(&mut self) -> Result<Option<Value>, AvroError> {
238 if self.is_empty() {
239 self.read_block_next()?;
240 if self.is_empty() {
241 return Ok(None);
242 }
243 }
244
245 let mut block_bytes = &self.buf[self.buf_idx..];
246 let b_original = block_bytes.len();
247 let schema = self.schema();
248 let item = from_avro_datum(schema, &mut block_bytes)?;
249 self.buf_idx += b_original - block_bytes.len();
250 self.messages_remaining -= 1;
251 Ok(Some(item))
252 }
253
254 fn is_empty(&self) -> bool {
255 self.messages_remaining == 0
256 }
257
258 fn fill_buf(&mut self, n: usize) -> Result<(), AvroError> {
259 if n >= self.buf.len() {
261 self.buf.resize(n, 0);
262 }
263
264 self.inner.read_exact(&mut self.buf[..n])?;
265 self.buf_idx = 0;
266 Ok(())
267 }
268
269 fn read_block_next(&mut self) -> Result<(), AvroError> {
270 assert!(self.is_empty(), "Expected self to be empty!");
271 match util::read_long(&mut self.inner) {
272 Ok(block_len) => {
273 self.messages_remaining = block_len as usize;
274 let block_bytes = util::read_long(&mut self.inner)?;
275 self.fill_buf(block_bytes as usize)?;
276 let mut marker = [0u8; 16];
277 self.inner.read_exact(&mut marker)?;
278
279 if marker != self.header.marker {
280 return Err(DecodeError::MismatchedBlockHeader {
281 expected: self.header.marker,
282 actual: marker,
283 }
284 .into());
285 }
286
287 self.header.codec.decompress(&mut self.buf)?;
294
295 Ok(())
296 }
297 Err(e) => {
298 if let AvroError::IO(std::io::ErrorKind::UnexpectedEof) = e {
299 Ok(())
301 } else {
302 Err(e)
303 }
304 }
305 }
306 }
307}
308
309impl<R: AvroRead> Iterator for Reader<R> {
310 type Item = Result<Value, AvroError>;
311
312 fn next(&mut self) -> Option<Self::Item> {
313 if self.errored {
315 return None;
316 };
317 match self.read_next() {
318 Ok(opt) => opt.map(Ok),
319 Err(e) => {
320 self.errored = true;
321 Some(Err(e))
322 }
323 }
324 }
325}
326
327pub struct SchemaResolver<'a> {
328 pub named: Vec<Option<NamedSchemaPiece>>,
329 pub indices: HashMap<FullName, usize>,
330 pub human_readable_field_path: Vec<String>,
331 pub current_human_readable_path_start: usize,
332 pub writer_to_reader_names: HashMap<usize, usize>,
333 pub reader_to_writer_names: HashMap<usize, usize>,
334 pub reader_to_resolved_names: HashMap<usize, usize>,
335 pub reader_fullnames: HashMap<usize, &'a FullName>,
336 pub reader_schema: &'a Schema,
337}
338
339impl<'a> SchemaResolver<'a> {
340 fn resolve_named(
341 &mut self,
342 writer: &Schema,
343 reader: &Schema,
344 writer_index: usize,
345 reader_index: usize,
346 ) -> Result<SchemaPiece, AvroError> {
347 let ws = writer.lookup(writer_index);
348 let rs = reader.lookup(reader_index);
349 let typ = match (&ws.piece, &rs.piece) {
350 (
351 SchemaPiece::Record {
352 fields: w_fields,
353 lookup: w_lookup,
354 ..
355 },
356 SchemaPiece::Record {
357 fields: r_fields,
358 lookup: _r_lookup,
359 ..
360 },
361 ) => {
362 let mut defaults = Vec::new();
363 let mut fields: Vec<Option<RecordField>> = Vec::new();
364 for (r_index, rf) in r_fields.iter().enumerate() {
365 match w_lookup.get(&rf.name) {
366 None => {
367 let default_field = match &rf.default {
368 Some(v) => ResolvedDefaultValueField {
369 name: rf.name.clone(),
370 doc: rf.doc.clone(),
371 default: reader
372 .top_node_or_named()
373 .step(&rf.schema)
374 .lookup()
375 .json_to_value(v)?,
376 order: rf.order.clone(),
377 position: r_index,
378 },
379 None => {
380 return Err(SchemaResolutionError::new(format!(
381 "Reader field `{}.{}` not found in writer, and has no default",
382 self.get_current_human_readable_path(),
383 rf.name
384 ))
385 .into())
386 }
387 };
388 defaults.push(default_field);
389 }
390 Some(w_index) => {
391 if fields.len() > *w_index && fields[*w_index].is_some() {
392 return Err(SchemaResolutionError::new(format!(
393 "Duplicate field `{}.{}` in schema",
394 self.get_current_human_readable_path(),
395 rf.name
396 ))
397 .into());
398 }
399 let wf = &w_fields[*w_index];
400 let w_node = SchemaNodeOrNamed {
401 root: writer,
402 inner: wf.schema.as_ref(),
403 };
404 let r_node = SchemaNodeOrNamed {
405 root: reader,
406 inner: rf.schema.as_ref(),
407 };
408
409 self.human_readable_field_path.push(rf.name.clone());
410 let new_inner = self.resolve(w_node, r_node)?;
411 self.human_readable_field_path.pop();
412
413 let field = RecordField {
414 name: rf.name.clone(),
415 doc: rf.doc.clone(),
416 default: rf.default.clone(),
417 schema: new_inner,
418 order: rf.order.clone(),
419 position: r_index,
420 };
421 while fields.len() <= *w_index {
422 fields.push(None);
423 }
424 fields[*w_index] = Some(field)
425 }
426 }
427 }
428 while fields.len() < w_fields.len() {
429 fields.push(None);
430 }
431 let mut n_present = 0;
432 let fields = fields
433 .into_iter()
434 .enumerate()
435 .map(|(i, rf)| match rf {
436 Some(rf) => {
437 n_present += 1;
438 ResolvedRecordField::Present(rf)
439 }
440 None => {
441 let writer_schema_piece = SchemaNodeOrNamed {
453 root: writer,
454 inner: w_fields[i].schema.as_ref(),
455 }
456 .to_schema();
457 ResolvedRecordField::Absent(writer_schema_piece)
458 }
459 })
460 .collect();
461 let n_reader_fields = defaults.len() + n_present;
462 SchemaPiece::ResolveRecord {
463 defaults,
464 fields,
465 n_reader_fields,
466 }
467 }
468 (
469 SchemaPiece::Enum {
470 symbols: w_symbols, ..
471 },
472 SchemaPiece::Enum {
473 symbols: r_symbols,
474 doc,
475 default_idx,
476 },
477 ) => {
478 let r_map = r_symbols
479 .iter()
480 .enumerate()
481 .map(|(i, s)| (s, i))
482 .collect::<HashMap<_, _>>();
483 let symbols = w_symbols
484 .iter()
485 .map(|s| {
486 r_map
487 .get(s)
488 .map(|i| (*i, s.clone()))
489 .ok_or_else(|| s.clone())
490 })
491 .collect();
492 SchemaPiece::ResolveEnum {
493 doc: doc.clone(),
494 symbols,
495 default: default_idx.map(|i| (i, r_symbols[i].clone())),
496 }
497 }
498 (SchemaPiece::Fixed { size: wsz }, SchemaPiece::Fixed { size: rsz }) => {
499 if *wsz == *rsz {
500 SchemaPiece::Fixed { size: *wsz }
501 } else {
502 return Err(SchemaResolutionError::new(format!(
503 "Fixed schema {:?}: sizes don't match ({}, {}) for field `{}`",
504 &rs.name,
505 wsz,
506 rsz,
507 self.get_current_human_readable_path(),
508 ))
509 .into());
510 }
511 }
512 (
513 SchemaPiece::Decimal {
514 precision: wp,
515 scale: wscale,
516 fixed_size: wsz,
517 },
518 SchemaPiece::Decimal {
519 precision: rp,
520 scale: rscale,
521 fixed_size: rsz,
522 },
523 ) => {
524 if wp != rp {
525 return Err(SchemaResolutionError::new(format!(
526 "Decimal schema {:?}: precisions don't match: {}, {} for field `{}`",
527 &rs.name,
528 wp,
529 rp,
530 self.get_current_human_readable_path(),
531 ))
532 .into());
533 }
534 if wscale != rscale {
535 return Err(SchemaResolutionError::new(format!(
536 "Decimal schema {:?}: sizes don't match: {}, {} for field `{}`",
537 &rs.name,
538 wscale,
539 rscale,
540 self.get_current_human_readable_path(),
541 ))
542 .into());
543 }
544 if wsz != rsz {
545 return Err(SchemaResolutionError::new(format!(
546 "Decimal schema {:?}: sizes don't match: {:?}, {:?} for field `{}`",
547 &rs.name,
548 wsz,
549 rsz,
550 self.get_current_human_readable_path(),
551 ))
552 .into());
553 }
554 SchemaPiece::Decimal {
555 precision: *wp,
556 scale: *wscale,
557 fixed_size: *wsz,
558 }
559 }
560 (SchemaPiece::Decimal { fixed_size, .. }, SchemaPiece::Fixed { size })
561 if *fixed_size == Some(*size) =>
562 {
563 SchemaPiece::Fixed { size: *size }
564 }
565 (
566 SchemaPiece::Fixed { size },
567 SchemaPiece::Decimal {
568 precision,
569 scale,
570 fixed_size,
571 },
572 ) if *fixed_size == Some(*size) => SchemaPiece::Decimal {
573 precision: *precision,
574 scale: *scale,
575 fixed_size: *fixed_size,
576 },
577
578 (_, SchemaPiece::ResolveRecord { .. })
579 | (_, SchemaPiece::ResolveEnum { .. })
580 | (SchemaPiece::ResolveRecord { .. }, _)
581 | (SchemaPiece::ResolveEnum { .. }, _) => {
582 return Err(SchemaResolutionError::new(
583 "Attempted to resolve an already resolved schema".to_string(),
584 )
585 .into());
586 }
587
588 (_wt, _rt) => {
589 return Err(SchemaResolutionError::new(format!(
590 "Non-matching schemas: writer: {:?}, reader: {:?}",
591 ws.name, rs.name
592 ))
593 .into())
594 }
595 };
596 Ok(typ)
597 }
598
599 pub fn resolve(
600 &mut self,
601 writer: SchemaNodeOrNamed,
602 reader: SchemaNodeOrNamed,
603 ) -> Result<SchemaPieceOrNamed, AvroError> {
604 let previous_human_readable_path_start = self.current_human_readable_path_start;
605 let (_, named_node) = reader.inner.get_piece_and_name(reader.root);
606 if let Some(full_name) = named_node {
607 self.current_human_readable_path_start = self.human_readable_field_path.len();
608 self.human_readable_field_path.push(full_name.human_name());
609 }
610
611 let inner = match (writer.inner, reader.inner) {
612 (
626 SchemaPieceRefOrNamed::Piece(SchemaPiece::Union(w_inner)),
627 SchemaPieceRefOrNamed::Piece(SchemaPiece::Union(r_inner)),
628 ) => {
629 let w2r = self.writer_to_reader_names.clone();
630 let permutation = w_inner
638 .variants()
639 .iter()
640 .map(|w_variant| {
641 let (r_idx, r_variant) =
642 r_inner.match_(w_variant, &w2r).ok_or_else(|| {
643 SchemaResolutionError::new(format!(
644 "Failed to match writer union variant `{}` against any variant in the reader for field `{}`",
645 w_variant.get_human_name(writer.root),
646 self.get_current_human_readable_path()
647 ))
648 })?;
649 let resolved =
650 self.resolve(writer.step(w_variant), reader.step(r_variant))?;
651 Ok((r_idx, resolved))
652 })
653 .collect();
654 let n_reader_variants = r_inner.variants().len();
655 let reader_null_variant = r_inner
656 .variants()
657 .iter()
658 .position(|v| v == &SchemaPieceOrNamed::Piece(SchemaPiece::Null));
659 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveUnionUnion {
660 permutation,
661 n_reader_variants,
662 reader_null_variant,
663 })
664 }
665 (other, SchemaPieceRefOrNamed::Piece(SchemaPiece::Union(r_inner))) => {
667 let n_reader_variants = r_inner.variants().len();
668 let reader_null_variant = r_inner
669 .variants()
670 .iter()
671 .position(|v| v == &SchemaPieceOrNamed::Piece(SchemaPiece::Null));
672 let (index, r_inner) = r_inner
673 .match_ref(other, &self.writer_to_reader_names)
674 .ok_or_else(|| {
675 SchemaResolutionError::new(
676 format!("No matching schema in reader union for writer type `{}` for field `{}`",
677 other.get_human_name(writer.root),
678 self.get_current_human_readable_path()))
679 })?;
680 let inner = Box::new(self.resolve(writer.step_ref(other), reader.step(r_inner))?);
681 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveConcreteUnion {
682 index,
683 inner,
684 n_reader_variants,
685 reader_null_variant,
686 })
687 }
688 (SchemaPieceRefOrNamed::Piece(SchemaPiece::Union(w_inner)), other) => {
690 let (index, w_inner) = w_inner
691 .match_ref(other, &self.reader_to_writer_names)
692 .ok_or_else(|| {
693 SchemaResolutionError::new(
694 format!("No matching schema in writer union for reader type `{}` for field `{}`",
695 other.get_human_name(writer.root),
696 self.get_current_human_readable_path()))
697 })?;
698 let inner = Box::new(self.resolve(writer.step(w_inner), reader.step_ref(other))?);
699 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveUnionConcrete { index, inner })
700 }
701 (SchemaPieceRefOrNamed::Piece(wp), SchemaPieceRefOrNamed::Piece(rp)) => {
703 match (wp, rp) {
704 (SchemaPiece::TimestampMilli, SchemaPiece::TimestampMicro) => {
711 SchemaPieceOrNamed::Piece(SchemaPiece::TimestampMilli)
712 }
713 (SchemaPiece::TimestampMicro, SchemaPiece::TimestampMilli) => {
715 SchemaPieceOrNamed::Piece(SchemaPiece::TimestampMicro)
716 }
717 (SchemaPiece::Date, SchemaPiece::TimestampMilli)
718 | (SchemaPiece::Date, SchemaPiece::TimestampMicro) => {
719 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveDateTimestamp)
720 }
721 (wp, rp) if wp.is_underlying_int() && rp.is_underlying_int() => {
722 SchemaPieceOrNamed::Piece(rp.clone()) }
724 (wp, rp) if wp.is_underlying_long() && rp.is_underlying_long() => {
725 SchemaPieceOrNamed::Piece(rp.clone()) }
727 (wp, SchemaPiece::TimestampMilli) if wp.is_underlying_int() => {
728 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveIntTsMilli)
729 }
730 (wp, SchemaPiece::TimestampMicro) if wp.is_underlying_int() => {
731 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveIntTsMicro)
732 }
733 (SchemaPiece::Null, SchemaPiece::Null) => {
734 SchemaPieceOrNamed::Piece(SchemaPiece::Null)
735 }
736 (SchemaPiece::Boolean, SchemaPiece::Boolean) => {
737 SchemaPieceOrNamed::Piece(SchemaPiece::Boolean)
738 }
739 (SchemaPiece::Int, SchemaPiece::Long) => {
740 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveIntLong)
741 }
742 (SchemaPiece::Int, SchemaPiece::Float) => {
743 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveIntFloat)
744 }
745 (SchemaPiece::Int, SchemaPiece::Double) => {
746 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveIntDouble)
747 }
748 (SchemaPiece::Long, SchemaPiece::Float) => {
749 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveLongFloat)
750 }
751 (SchemaPiece::Long, SchemaPiece::Double) => {
752 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveLongDouble)
753 }
754 (SchemaPiece::Float, SchemaPiece::Float) => {
755 SchemaPieceOrNamed::Piece(SchemaPiece::Float)
756 }
757 (SchemaPiece::Float, SchemaPiece::Double) => {
758 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveFloatDouble)
759 }
760 (SchemaPiece::Double, SchemaPiece::Double) => {
761 SchemaPieceOrNamed::Piece(SchemaPiece::Double)
762 }
763 (b, SchemaPiece::Bytes)
764 if b == &SchemaPiece::Bytes || b == &SchemaPiece::String =>
765 {
766 SchemaPieceOrNamed::Piece(SchemaPiece::Bytes)
767 }
768 (s, SchemaPiece::String)
769 if s == &SchemaPiece::String || s == &SchemaPiece::Bytes =>
770 {
771 SchemaPieceOrNamed::Piece(SchemaPiece::String)
772 }
773 (SchemaPiece::Array(w_inner), SchemaPiece::Array(r_inner)) => {
774 let inner =
775 self.resolve(writer.step(&**w_inner), reader.step(&**r_inner))?;
776 SchemaPieceOrNamed::Piece(SchemaPiece::Array(Box::new(inner)))
777 }
778 (SchemaPiece::Map(w_inner), SchemaPiece::Map(r_inner)) => {
779 let inner =
780 self.resolve(writer.step(&**w_inner), reader.step(&**r_inner))?;
781 SchemaPieceOrNamed::Piece(SchemaPiece::Map(Box::new(inner)))
782 }
783 (
784 SchemaPiece::Decimal {
785 precision: wp,
786 scale: ws,
787 fixed_size: wf,
788 },
789 SchemaPiece::Decimal {
790 precision: rp,
791 scale: rs,
792 fixed_size: rf,
793 },
794 ) => {
795 if wp == rp && ws == rs && wf == rf {
796 SchemaPieceOrNamed::Piece(SchemaPiece::Decimal {
797 precision: *wp,
798 scale: *ws,
799 fixed_size: *wf,
800 })
801 } else {
802 return Err(SchemaResolutionError::new(format!(
803 "Decimal types must match in precision, scale, and fixed size. \
804 Got ({:?}, {:?}, {:?}); ({:?}, {:?}. {:?}) for field `{}`",
805 wp,
806 ws,
807 wf,
808 rp,
809 rs,
810 rf,
811 self.get_current_human_readable_path(),
812 ))
813 .into());
814 }
815 }
816 (SchemaPiece::Decimal { fixed_size, .. }, SchemaPiece::Bytes)
817 if *fixed_size == None =>
818 {
819 SchemaPieceOrNamed::Piece(SchemaPiece::Bytes)
820 }
821 (SchemaPiece::Json, SchemaPiece::Json) => {
825 SchemaPieceOrNamed::Piece(SchemaPiece::Json)
826 }
827 (SchemaPiece::Uuid, SchemaPiece::Uuid) => {
828 SchemaPieceOrNamed::Piece(SchemaPiece::Uuid)
829 }
830 (
831 SchemaPiece::Bytes,
832 SchemaPiece::Decimal {
833 precision,
834 scale,
835 fixed_size,
836 },
837 ) if *fixed_size == None => SchemaPieceOrNamed::Piece(SchemaPiece::Decimal {
838 precision: *precision,
839 scale: *scale,
840 fixed_size: *fixed_size,
841 }),
842 (ws, rs) => {
843 return Err(SchemaResolutionError::new(format!(
844 "Writer schema has type `{:?}`, but reader schema has type `{:?}` for field `{}`",
845 ws,
846 rs,
847 self.get_current_human_readable_path(),
848 ))
849 .into());
850 }
851 }
852 }
853 (SchemaPieceRefOrNamed::Named(w_index), SchemaPieceRefOrNamed::Named(r_index)) => {
855 if self.writer_to_reader_names.get(&w_index) != Some(&r_index) {
856 let (w_name, r_name) = (
858 &writer.root.lookup(w_index).name,
859 &reader.root.lookup(r_index).name,
860 );
861 return Err(SchemaResolutionError::new(format!("Attempted to resolve writer schema node named {} against reader schema node named {}", w_name, r_name)).into());
862 }
863 let idx = match self.reader_to_resolved_names.get(&r_index) {
866 Some(resolved) => *resolved,
867 None => {
868 let resolved_idx = self.named.len();
874 self.reader_to_resolved_names.insert(r_index, resolved_idx);
875 self.named.push(None);
876 let piece =
877 match self.resolve_named(writer.root, reader.root, w_index, r_index) {
878 Ok(piece) => piece,
879 Err(e) => {
880 self.named.pop();
882 self.reader_to_resolved_names.remove(&r_index);
883 return Err(e);
884 }
885 };
886 let name = &self.reader_schema.named[r_index].name;
887 let ns = NamedSchemaPiece {
888 name: name.clone(),
889 piece,
890 };
891 self.named[resolved_idx] = Some(ns);
892 self.indices.insert(name.clone(), resolved_idx);
893
894 resolved_idx
895 }
896 };
897 SchemaPieceOrNamed::Named(idx)
898 }
899 (ws, rs) => {
900 return Err(SchemaResolutionError::new(format!(
901 "Schemas don't match: {:?}, {:?} for field `{}`",
902 ws.get_piece_and_name(&writer.root).0,
903 rs.get_piece_and_name(&reader.root).0,
904 self.get_current_human_readable_path(),
905 ))
906 .into())
907 }
908 };
909 if named_node.is_some() {
910 self.human_readable_field_path.pop();
911 self.current_human_readable_path_start = previous_human_readable_path_start;
912 }
913 Ok(inner)
914 }
915
916 fn get_current_human_readable_path(&self) -> String {
917 return self.human_readable_field_path[self.current_human_readable_path_start..].join(".");
918 }
919}
920
921pub fn from_avro_datum<R: AvroRead>(schema: &Schema, reader: &mut R) -> Result<Value, AvroError> {
930 let value = decode(schema.top_node(), reader)?;
931 Ok(value)
932}
933
934#[cfg(test)]
935mod tests {
936 use super::*;
937 use crate::types::{Record, ToAvro};
938 use crate::Reader;
939
940 use std::io::Cursor;
941
942 static SCHEMA: &str = r#"
943 {
944 "type": "record",
945 "name": "test",
946 "fields": [
947 {"name": "a", "type": "long", "default": 42},
948 {"name": "b", "type": "string"}
949 ]
950 }
951 "#;
952 static UNION_SCHEMA: &str = r#"
953 ["null", "long"]
954 "#;
955 static ENCODED: &[u8] = &[
956 79u8, 98u8, 106u8, 1u8, 4u8, 22u8, 97u8, 118u8, 114u8, 111u8, 46u8, 115u8, 99u8, 104u8,
957 101u8, 109u8, 97u8, 222u8, 1u8, 123u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8,
958 114u8, 101u8, 99u8, 111u8, 114u8, 100u8, 34u8, 44u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8,
959 58u8, 34u8, 116u8, 101u8, 115u8, 116u8, 34u8, 44u8, 34u8, 102u8, 105u8, 101u8, 108u8,
960 100u8, 115u8, 34u8, 58u8, 91u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8,
961 97u8, 34u8, 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 108u8, 111u8, 110u8,
962 103u8, 34u8, 44u8, 34u8, 100u8, 101u8, 102u8, 97u8, 117u8, 108u8, 116u8, 34u8, 58u8, 52u8,
963 50u8, 125u8, 44u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8, 98u8, 34u8,
964 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 115u8, 116u8, 114u8, 105u8,
965 110u8, 103u8, 34u8, 125u8, 93u8, 125u8, 20u8, 97u8, 118u8, 114u8, 111u8, 46u8, 99u8, 111u8,
966 100u8, 101u8, 99u8, 8u8, 110u8, 117u8, 108u8, 108u8, 0u8, 94u8, 61u8, 54u8, 221u8, 190u8,
967 207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8, 4u8, 20u8, 54u8,
968 6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
969 207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
970 ];
971
972 #[test]
973 fn test_from_avro_datum() {
974 let schema: Schema = SCHEMA.parse().unwrap();
975 let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
976
977 let mut record = Record::new(schema.top_node()).unwrap();
978 record.put("a", 27i64);
979 record.put("b", "foo");
980 let expected = record.avro();
981
982 assert_eq!(from_avro_datum(&schema, &mut encoded).unwrap(), expected);
983 }
984
985 #[test]
986 fn test_null_union() {
987 let schema: Schema = UNION_SCHEMA.parse().unwrap();
988 let mut encoded: &'static [u8] = &[2, 0];
989
990 assert_eq!(
991 from_avro_datum(&schema, &mut encoded).unwrap(),
992 Value::Union {
993 index: 1,
994 inner: Box::new(Value::Long(0)),
995 n_variants: 2,
996 null_variant: Some(0)
997 }
998 );
999 }
1000
1001 #[test]
1002 fn test_reader_stream() {
1003 let schema: Schema = SCHEMA.parse().unwrap();
1004 let reader = Reader::with_schema(&schema, ENCODED).unwrap();
1005
1006 let mut record1 = Record::new(schema.top_node()).unwrap();
1007 record1.put("a", 27i64);
1008 record1.put("b", "foo");
1009
1010 let mut record2 = Record::new(schema.top_node()).unwrap();
1011 record2.put("a", 42i64);
1012 record2.put("b", "bar");
1013
1014 let expected = vec![record1.avro(), record2.avro()];
1015
1016 for (i, value) in reader.enumerate() {
1017 assert_eq!(value.unwrap(), expected[i]);
1018 }
1019 }
1020
1021 #[test]
1022 fn test_reader_invalid_header() {
1023 let schema: Schema = SCHEMA.parse().unwrap();
1024 let invalid = ENCODED.iter().skip(1).copied().collect::<Vec<u8>>();
1025 assert!(Reader::with_schema(&schema, &invalid[..]).is_err());
1026 }
1027
1028 #[test]
1029 fn test_reader_invalid_block() {
1030 let schema: Schema = SCHEMA.parse().unwrap();
1031 let invalid = ENCODED
1032 .iter()
1033 .rev()
1034 .skip(19)
1035 .copied()
1036 .collect::<Vec<u8>>()
1037 .into_iter()
1038 .rev()
1039 .collect::<Vec<u8>>();
1040 let reader = Reader::with_schema(&schema, &invalid[..]).unwrap();
1041 for value in reader {
1042 assert!(value.is_err());
1043 }
1044 }
1045
1046 #[test]
1047 fn test_reader_empty_buffer() {
1048 let empty = Cursor::new(Vec::new());
1049 assert!(Reader::new(empty).is_err());
1050 }
1051
1052 #[test]
1053 fn test_reader_only_header() {
1054 let invalid = ENCODED.iter().copied().take(165).collect::<Vec<u8>>();
1055 let reader = Reader::new(&invalid[..]).unwrap();
1056 for value in reader {
1057 assert!(value.is_err());
1058 }
1059 }
1060
1061 #[test]
1062 fn test_resolution_nested_types_error() {
1063 let r = r#"
1064{
1065 "type": "record",
1066 "name": "com.materialize.foo",
1067 "fields": [
1068 {"name": "f1", "type": {"type": "record", "name": "com.materialize.bar", "fields": [{"name": "f1_1", "type": "int"}]}}
1069 ]
1070}
1071"#;
1072 let w = r#"
1073{
1074 "type": "record",
1075 "name": "com.materialize.foo",
1076 "fields": [
1077 {"name": "f1", "type": {"type": "record", "name": "com.materialize.bar", "fields": [{"name": "f1_1", "type": "double"}]}}
1078 ]
1079}
1080"#;
1081 let r: Schema = r.parse().unwrap();
1082 let w: Schema = w.parse().unwrap();
1083 let err_str = if let Result::Err(AvroError::ResolveSchema(SchemaResolutionError(s))) =
1084 resolve_schemas(&w, &r)
1085 {
1086 s
1087 } else {
1088 panic!("Expected schema resolution failure");
1089 };
1090 assert_eq!(&err_str, "Writer schema has type `Double`, but reader schema has type `Int` for field `com.materialize.bar.f1_1`");
1094 }
1095
1096 #[test]
1097 fn test_extra_fields_without_default_error() {
1098 let r = r#"
1099{
1100 "type": "record",
1101 "name": "com.materialize.foo",
1102 "fields": [
1103 {"name": "f1", "type": "int"},
1104 {"name": "f2", "type": "int"}
1105 ]
1106}
1107"#;
1108 let w = r#"
1109{
1110 "type": "record",
1111 "name": "com.materialize.foo",
1112 "fields": [
1113 {"name": "f1", "type": "int"}
1114 ]
1115}
1116"#;
1117 let r: Schema = r.parse().unwrap();
1118 let w: Schema = w.parse().unwrap();
1119 let err_str = if let Result::Err(AvroError::ResolveSchema(SchemaResolutionError(s))) =
1120 resolve_schemas(&w, &r)
1121 {
1122 s
1123 } else {
1124 panic!("Expected schema resolution failure");
1125 };
1126 assert_eq!(
1127 &err_str,
1128 "Reader field `com.materialize.foo.f2` not found in writer, and has no default"
1129 );
1130 }
1131
1132 #[test]
1133 fn test_duplicate_field_error() {
1134 let r = r#"
1135{
1136 "type": "record",
1137 "name": "com.materialize.bar",
1138 "fields": [
1139 {"name": "f1", "type": "int"},
1140 {"name": "f1", "type": "int"}
1141 ]
1142}
1143"#;
1144 let w = r#"
1145{
1146 "type": "record",
1147 "name": "com.materialize.bar",
1148 "fields": [
1149 {"name": "f1", "type": "int"}
1150 ]
1151}
1152"#;
1153 let r: Schema = r.parse().unwrap();
1154 let w: Schema = w.parse().unwrap();
1155 let err_str = if let Result::Err(AvroError::ResolveSchema(SchemaResolutionError(s))) =
1156 resolve_schemas(&w, &r)
1157 {
1158 s
1159 } else {
1160 panic!("Expected schema resolution failure");
1161 };
1162 assert_eq!(
1163 &err_str,
1164 "Duplicate field `com.materialize.bar.f1` in schema"
1165 );
1166 }
1167
1168 #[test]
1169 fn test_decimal_field_mismatch_error() {
1170 let r = r#"
1171{
1172 "type": "record",
1173 "name": "com.materialize.foo",
1174 "fields": [
1175 {"name": "f1", "type": {"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 2}}
1176 ]
1177}
1178"#;
1179 let w = r#"
1180{
1181 "type": "record",
1182 "name": "com.materialize.foo",
1183 "fields": [
1184 {"name": "f1", "type": {"type": "bytes", "logicalType": "decimal", "precision": 5, "scale": 1}}
1185 ]
1186}
1187"#;
1188 let r: Schema = r.parse().unwrap();
1189 let w: Schema = w.parse().unwrap();
1190 let err_str = if let Result::Err(AvroError::ResolveSchema(SchemaResolutionError(s))) =
1191 resolve_schemas(&w, &r)
1192 {
1193 s
1194 } else {
1195 panic!("Expected schema resolution failure");
1196 };
1197 assert_eq!(
1198 &err_str,
1199 "Decimal types must match in precision, scale, and fixed size. Got (5, 1, None); (4, 2. None) for field `com.materialize.foo.f1`"
1200 );
1201 }
1202}