mz_avro/
reader.rs

1// Copyright 2018 Flavien Raynaud.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License in the LICENSE file at the
7// root of this repository, or online at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// This file is derived from the avro-rs project, available at
18// https://github.com/flavray/avro-rs. It was incorporated
19// directly into Materialize on March 3, 2020.
20//
21// The original source code is subject to the terms of the MIT license, a copy
22// of which can be found in the LICENSE file at the root of this repository.
23
24//! Logic handling reading from Avro format at user level.
25
26use std::str::{from_utf8, FromStr};
27
28use serde_json::from_slice;
29
30use crate::decode::{decode, AvroRead};
31use crate::error::{DecodeError, Error as AvroError};
32use crate::schema::{
33    resolve_schemas, FullName, NamedSchemaPiece, ParseSchemaError, RecordField,
34    ResolvedDefaultValueField, SchemaNodeOrNamed, SchemaPiece, SchemaPieceOrNamed,
35    SchemaPieceRefOrNamed,
36};
37use crate::schema::{ResolvedRecordField, Schema};
38use crate::types::{AvroMap, Value};
39use crate::util::{self};
40use crate::{Codec, SchemaResolutionError};
41
42use sha2::Sha256;
43use std::collections::HashMap;
44
45#[derive(Debug, Clone)]
46pub(crate) struct Header {
47    writer_schema: Schema,
48    marker: [u8; 16],
49    codec: Codec,
50}
51
52impl Header {
53    pub fn from_reader<R: AvroRead>(reader: &mut R) -> Result<Header, AvroError> {
54        let meta_schema = Schema {
55            named: vec![],
56            indices: Default::default(),
57            top: SchemaPiece::Map(Box::new(SchemaPiece::Bytes.into())).into(),
58        };
59
60        let mut buf = [0u8; 4];
61        reader.read_exact(&mut buf)?;
62
63        if buf != [b'O', b'b', b'j', 1u8] {
64            return Err(AvroError::Decode(DecodeError::WrongHeaderMagic(buf)));
65        }
66
67        if let Value::Map(AvroMap(meta)) = decode(meta_schema.top_node(), reader)? {
68            // TODO: surface original parse schema errors instead of coalescing them here
69            let json = meta
70                .get("avro.schema")
71                .ok_or(AvroError::Decode(DecodeError::MissingAvroDotSchema))
72                .and_then(|bytes| {
73                    if let Value::Bytes(ref bytes) = *bytes {
74                        from_slice(bytes.as_ref()).map_err(|e| {
75                            AvroError::ParseSchema(ParseSchemaError::new(format!(
76                                "unable to decode schema bytes: {}",
77                                e
78                            )))
79                        })
80                    } else {
81                        unreachable!()
82                    }
83                })?;
84            let writer_schema = Schema::parse(&json).map_err(|e| {
85                ParseSchemaError::new(format!("unable to parse json as avro schema: {}", e))
86            })?;
87
88            let codec = meta
89                .get("avro.codec")
90                .map(|val| match val {
91                    Value::Bytes(ref bytes) => from_utf8(bytes.as_ref())
92                        .map_err(|_e| AvroError::Decode(DecodeError::CodecUtf8Error))
93                        .and_then(|codec| {
94                            Codec::from_str(codec).map_err(|_| {
95                                AvroError::Decode(DecodeError::UnrecognizedCodec(codec.to_string()))
96                            })
97                        }),
98                    _ => unreachable!(),
99                })
100                .unwrap_or(Ok(Codec::Null))?;
101
102            let mut marker = [0u8; 16];
103            reader.read_exact(&mut marker)?;
104
105            Ok(Header {
106                writer_schema,
107                marker,
108                codec,
109            })
110        } else {
111            unreachable!()
112        }
113    }
114
115    pub fn into_parts(self) -> (Schema, [u8; 16], Codec) {
116        (self.writer_schema, self.marker, self.codec)
117    }
118}
119
120pub struct Reader<R> {
121    header: Header,
122    inner: R,
123    errored: bool,
124    resolved_schema: Option<Schema>,
125    messages_remaining: usize,
126    // Internal buffering to reduce allocation.
127    buf: Vec<u8>,
128    buf_idx: usize,
129}
130
131/// An iterator over the `Block`s of a `Reader`
132pub struct BlockIter<R> {
133    inner: Reader<R>,
134}
135
136/// A block of Avro objects from an OCF file
137#[derive(Debug, Clone)]
138pub struct Block {
139    /// The raw bytes for the block
140    pub bytes: Vec<u8>,
141    /// The number of Avro objects in the block
142    pub len: usize,
143}
144
145impl<R: AvroRead> BlockIter<R> {
146    pub fn with_schema(reader_schema: &Schema, inner: R) -> Result<Self, AvroError> {
147        Ok(Self {
148            inner: Reader::with_schema(reader_schema, inner)?,
149        })
150    }
151}
152
153impl<R: AvroRead> Iterator for BlockIter<R> {
154    type Item = Result<Block, AvroError>;
155
156    fn next(&mut self) -> Option<Self::Item> {
157        assert!(self.inner.is_empty());
158
159        match self.inner.read_block_next() {
160            Ok(()) => {
161                if self.inner.is_empty() {
162                    None
163                } else {
164                    let bytes = std::mem::take(&mut self.inner.buf);
165                    let len = std::mem::take(&mut self.inner.messages_remaining);
166                    Some(Ok(Block { bytes, len }))
167                }
168            }
169            Err(e) => Some(Err(e)),
170        }
171    }
172}
173
174impl<R: AvroRead> Reader<R> {
175    /// Creates a `Reader` given something implementing the `tokio::io::AsyncRead` trait to read from.
176    /// No reader `Schema` will be set.
177    ///
178    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
179    pub fn new(mut inner: R) -> Result<Reader<R>, AvroError> {
180        let header = Header::from_reader(&mut inner)?;
181        let reader = Reader {
182            header,
183            inner,
184            errored: false,
185            resolved_schema: None,
186            messages_remaining: 0,
187            buf: vec![],
188            buf_idx: 0,
189        };
190        Ok(reader)
191    }
192
193    /// Creates a `Reader` given a reader `Schema` and something implementing the `tokio::io::AsyncRead` trait
194    /// to read from.
195    ///
196    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
197    pub fn with_schema(reader_schema: &Schema, mut inner: R) -> Result<Reader<R>, AvroError> {
198        let header = Header::from_reader(&mut inner)?;
199
200        let writer_schema = &header.writer_schema;
201        let resolved_schema = if reader_schema.fingerprint::<Sha256>().bytes
202            != writer_schema.fingerprint::<Sha256>().bytes
203        {
204            Some(resolve_schemas(&writer_schema, reader_schema)?)
205        } else {
206            None
207        };
208
209        Ok(Reader {
210            header,
211            errored: false,
212            resolved_schema,
213            inner,
214            messages_remaining: 0,
215            buf: vec![],
216            buf_idx: 0,
217        })
218    }
219
220    /// Get a reference to the writer `Schema`.
221    pub fn writer_schema(&self) -> &Schema {
222        &self.header.writer_schema
223    }
224
225    /// Get a reference to the resolved schema
226    /// (or just the writer schema, if no reader schema was provided
227    ///  or the two schemas are identical)
228    pub fn schema(&self) -> &Schema {
229        match &self.resolved_schema {
230            Some(schema) => schema,
231            None => self.writer_schema(),
232        }
233    }
234
235    #[inline]
236    /// Read the next Avro value from the file, if one exists.
237    pub fn read_next(&mut self) -> Result<Option<Value>, AvroError> {
238        if self.is_empty() {
239            self.read_block_next()?;
240            if self.is_empty() {
241                return Ok(None);
242            }
243        }
244
245        let mut block_bytes = &self.buf[self.buf_idx..];
246        let b_original = block_bytes.len();
247        let schema = self.schema();
248        let item = from_avro_datum(schema, &mut block_bytes)?;
249        self.buf_idx += b_original - block_bytes.len();
250        self.messages_remaining -= 1;
251        Ok(Some(item))
252    }
253
254    fn is_empty(&self) -> bool {
255        self.messages_remaining == 0
256    }
257
258    fn fill_buf(&mut self, n: usize) -> Result<(), AvroError> {
259        // We don't have enough space in the buffer, need to grow it.
260        if n >= self.buf.len() {
261            self.buf.resize(n, 0);
262        }
263
264        self.inner.read_exact(&mut self.buf[..n])?;
265        self.buf_idx = 0;
266        Ok(())
267    }
268
269    fn read_block_next(&mut self) -> Result<(), AvroError> {
270        assert!(self.is_empty(), "Expected self to be empty!");
271        match util::read_long(&mut self.inner) {
272            Ok(block_len) => {
273                self.messages_remaining = block_len as usize;
274                let block_bytes = util::read_long(&mut self.inner)?;
275                self.fill_buf(block_bytes as usize)?;
276                let mut marker = [0u8; 16];
277                self.inner.read_exact(&mut marker)?;
278
279                if marker != self.header.marker {
280                    return Err(DecodeError::MismatchedBlockHeader {
281                        expected: self.header.marker,
282                        actual: marker,
283                    }
284                    .into());
285                }
286
287                // NOTE (JAB): This doesn't fit this Reader pattern very well.
288                // `self.buf` is a growable buffer that is reused as the reader is iterated.
289                // For non `Codec::Null` variants, `decompress` will allocate a new `Vec`
290                // and replace `buf` with the new one, instead of reusing the same buffer.
291                // We can address this by using some "limited read" type to decode directly
292                // into the buffer. But this is fine, for now.
293                self.header.codec.decompress(&mut self.buf)?;
294
295                Ok(())
296            }
297            Err(e) => {
298                if let AvroError::IO(std::io::ErrorKind::UnexpectedEof) = e {
299                    // to not return any error in case we only finished to read cleanly from the stream
300                    Ok(())
301                } else {
302                    Err(e)
303                }
304            }
305        }
306    }
307}
308
309impl<R: AvroRead> Iterator for Reader<R> {
310    type Item = Result<Value, AvroError>;
311
312    fn next(&mut self) -> Option<Self::Item> {
313        // to prevent continuing to read after the first error occurs
314        if self.errored {
315            return None;
316        };
317        match self.read_next() {
318            Ok(opt) => opt.map(Ok),
319            Err(e) => {
320                self.errored = true;
321                Some(Err(e))
322            }
323        }
324    }
325}
326
327pub struct SchemaResolver<'a> {
328    pub named: Vec<Option<NamedSchemaPiece>>,
329    pub indices: HashMap<FullName, usize>,
330    pub human_readable_field_path: Vec<String>,
331    pub current_human_readable_path_start: usize,
332    pub writer_to_reader_names: HashMap<usize, usize>,
333    pub reader_to_writer_names: HashMap<usize, usize>,
334    pub reader_to_resolved_names: HashMap<usize, usize>,
335    pub reader_fullnames: HashMap<usize, &'a FullName>,
336    pub reader_schema: &'a Schema,
337}
338
339impl<'a> SchemaResolver<'a> {
340    fn resolve_named(
341        &mut self,
342        writer: &Schema,
343        reader: &Schema,
344        writer_index: usize,
345        reader_index: usize,
346    ) -> Result<SchemaPiece, AvroError> {
347        let ws = writer.lookup(writer_index);
348        let rs = reader.lookup(reader_index);
349        let typ = match (&ws.piece, &rs.piece) {
350            (
351                SchemaPiece::Record {
352                    fields: w_fields,
353                    lookup: w_lookup,
354                    ..
355                },
356                SchemaPiece::Record {
357                    fields: r_fields,
358                    lookup: _r_lookup,
359                    ..
360                },
361            ) => {
362                let mut defaults = Vec::new();
363                let mut fields: Vec<Option<RecordField>> = Vec::new();
364                for (r_index, rf) in r_fields.iter().enumerate() {
365                    match w_lookup.get(&rf.name) {
366                        None => {
367                            let default_field = match &rf.default {
368                                Some(v) => ResolvedDefaultValueField {
369                                    name: rf.name.clone(),
370                                    doc: rf.doc.clone(),
371                                    default: reader
372                                        .top_node_or_named()
373                                        .step(&rf.schema)
374                                        .lookup()
375                                        .json_to_value(v)?,
376                                    order: rf.order.clone(),
377                                    position: r_index,
378                                },
379                                None => {
380                                    return Err(SchemaResolutionError::new(format!(
381                                    "Reader field `{}.{}` not found in writer, and has no default",
382                                    self.get_current_human_readable_path(),
383                                    rf.name
384                                ))
385                                    .into())
386                                }
387                            };
388                            defaults.push(default_field);
389                        }
390                        Some(w_index) => {
391                            if fields.len() > *w_index && fields[*w_index].is_some() {
392                                return Err(SchemaResolutionError::new(format!(
393                                    "Duplicate field `{}.{}` in schema",
394                                    self.get_current_human_readable_path(),
395                                    rf.name
396                                ))
397                                .into());
398                            }
399                            let wf = &w_fields[*w_index];
400                            let w_node = SchemaNodeOrNamed {
401                                root: writer,
402                                inner: wf.schema.as_ref(),
403                            };
404                            let r_node = SchemaNodeOrNamed {
405                                root: reader,
406                                inner: rf.schema.as_ref(),
407                            };
408
409                            self.human_readable_field_path.push(rf.name.clone());
410                            let new_inner = self.resolve(w_node, r_node)?;
411                            self.human_readable_field_path.pop();
412
413                            let field = RecordField {
414                                name: rf.name.clone(),
415                                doc: rf.doc.clone(),
416                                default: rf.default.clone(),
417                                schema: new_inner,
418                                order: rf.order.clone(),
419                                position: r_index,
420                            };
421                            while fields.len() <= *w_index {
422                                fields.push(None);
423                            }
424                            fields[*w_index] = Some(field)
425                        }
426                    }
427                }
428                while fields.len() < w_fields.len() {
429                    fields.push(None);
430                }
431                let mut n_present = 0;
432                let fields = fields
433                    .into_iter()
434                    .enumerate()
435                    .map(|(i, rf)| match rf {
436                        Some(rf) => {
437                            n_present += 1;
438                            ResolvedRecordField::Present(rf)
439                        }
440                        None => {
441                            // Clone the chunk of the writer schema appearing here.
442                            // We could probably be clever and avoid some cloning,
443                            // but absolute highest performance probably isn't important for schema resolution.
444                            //
445                            // The cloned writer schema piece is needed to guide decoding of the value,
446                            // since even though it doesn't appear in the reader schema it needs
447                            // to be decoded to know where it ends.
448                            //
449                            // TODO -- We could try to come up with a "Dummy" schema variant
450                            // that does only enough decoding to find the end of a value,
451                            // and maybe save some time.
452                            let writer_schema_piece = SchemaNodeOrNamed {
453                                root: writer,
454                                inner: w_fields[i].schema.as_ref(),
455                            }
456                            .to_schema();
457                            ResolvedRecordField::Absent(writer_schema_piece)
458                        }
459                    })
460                    .collect();
461                let n_reader_fields = defaults.len() + n_present;
462                SchemaPiece::ResolveRecord {
463                    defaults,
464                    fields,
465                    n_reader_fields,
466                }
467            }
468            (
469                SchemaPiece::Enum {
470                    symbols: w_symbols, ..
471                },
472                SchemaPiece::Enum {
473                    symbols: r_symbols,
474                    doc,
475                    default_idx,
476                },
477            ) => {
478                let r_map = r_symbols
479                    .iter()
480                    .enumerate()
481                    .map(|(i, s)| (s, i))
482                    .collect::<HashMap<_, _>>();
483                let symbols = w_symbols
484                    .iter()
485                    .map(|s| {
486                        r_map
487                            .get(s)
488                            .map(|i| (*i, s.clone()))
489                            .ok_or_else(|| s.clone())
490                    })
491                    .collect();
492                SchemaPiece::ResolveEnum {
493                    doc: doc.clone(),
494                    symbols,
495                    default: default_idx.map(|i| (i, r_symbols[i].clone())),
496                }
497            }
498            (SchemaPiece::Fixed { size: wsz }, SchemaPiece::Fixed { size: rsz }) => {
499                if *wsz == *rsz {
500                    SchemaPiece::Fixed { size: *wsz }
501                } else {
502                    return Err(SchemaResolutionError::new(format!(
503                        "Fixed schema {:?}: sizes don't match ({}, {}) for field `{}`",
504                        &rs.name,
505                        wsz,
506                        rsz,
507                        self.get_current_human_readable_path(),
508                    ))
509                    .into());
510                }
511            }
512            (
513                SchemaPiece::Decimal {
514                    precision: wp,
515                    scale: wscale,
516                    fixed_size: wsz,
517                },
518                SchemaPiece::Decimal {
519                    precision: rp,
520                    scale: rscale,
521                    fixed_size: rsz,
522                },
523            ) => {
524                if wp != rp {
525                    return Err(SchemaResolutionError::new(format!(
526                        "Decimal schema {:?}: precisions don't match: {}, {} for field `{}`",
527                        &rs.name,
528                        wp,
529                        rp,
530                        self.get_current_human_readable_path(),
531                    ))
532                    .into());
533                }
534                if wscale != rscale {
535                    return Err(SchemaResolutionError::new(format!(
536                        "Decimal schema {:?}: sizes don't match: {}, {} for field `{}`",
537                        &rs.name,
538                        wscale,
539                        rscale,
540                        self.get_current_human_readable_path(),
541                    ))
542                    .into());
543                }
544                if wsz != rsz {
545                    return Err(SchemaResolutionError::new(format!(
546                        "Decimal schema {:?}: sizes don't match: {:?}, {:?} for field `{}`",
547                        &rs.name,
548                        wsz,
549                        rsz,
550                        self.get_current_human_readable_path(),
551                    ))
552                    .into());
553                }
554                SchemaPiece::Decimal {
555                    precision: *wp,
556                    scale: *wscale,
557                    fixed_size: *wsz,
558                }
559            }
560            (SchemaPiece::Decimal { fixed_size, .. }, SchemaPiece::Fixed { size })
561                if *fixed_size == Some(*size) =>
562            {
563                SchemaPiece::Fixed { size: *size }
564            }
565            (
566                SchemaPiece::Fixed { size },
567                SchemaPiece::Decimal {
568                    precision,
569                    scale,
570                    fixed_size,
571                },
572            ) if *fixed_size == Some(*size) => SchemaPiece::Decimal {
573                precision: *precision,
574                scale: *scale,
575                fixed_size: *fixed_size,
576            },
577
578            (_, SchemaPiece::ResolveRecord { .. })
579            | (_, SchemaPiece::ResolveEnum { .. })
580            | (SchemaPiece::ResolveRecord { .. }, _)
581            | (SchemaPiece::ResolveEnum { .. }, _) => {
582                return Err(SchemaResolutionError::new(
583                    "Attempted to resolve an already resolved schema".to_string(),
584                )
585                .into());
586            }
587
588            (_wt, _rt) => {
589                return Err(SchemaResolutionError::new(format!(
590                    "Non-matching schemas: writer: {:?}, reader: {:?}",
591                    ws.name, rs.name
592                ))
593                .into())
594            }
595        };
596        Ok(typ)
597    }
598
599    pub fn resolve(
600        &mut self,
601        writer: SchemaNodeOrNamed,
602        reader: SchemaNodeOrNamed,
603    ) -> Result<SchemaPieceOrNamed, AvroError> {
604        let previous_human_readable_path_start = self.current_human_readable_path_start;
605        let (_, named_node) = reader.inner.get_piece_and_name(reader.root);
606        if let Some(full_name) = named_node {
607            self.current_human_readable_path_start = self.human_readable_field_path.len();
608            self.human_readable_field_path.push(full_name.human_name());
609        }
610
611        let inner = match (writer.inner, reader.inner) {
612            // Both schemas are unions - the most complicated case, but simpler than it looks.
613            // For each variant in the writer, we attempt to find a matching variant in the reader,
614            // either by type (for anonymous nodes) or by name (for named nodes).
615            //
616            // Having found a match, we resolve the writer variant against the reader variant,
617            // and record it in the resolved node.
618            //
619            // If either no match is found, or resolution on the matches fails, it is not an error
620            // -- it simply means that the corresponding entry in `permutation` will be `None`,
621            // and reading will fail if that variant is expressed. But
622            // reading variants that *do* match and resolve will still be possible.
623            //
624            // See the doc comment on `SchemaPiece::ResolveUnionUnion` for an explanation of the format of `permutation`.
625            (
626                SchemaPieceRefOrNamed::Piece(SchemaPiece::Union(w_inner)),
627                SchemaPieceRefOrNamed::Piece(SchemaPiece::Union(r_inner)),
628            ) => {
629                let w2r = self.writer_to_reader_names.clone();
630                // permutation[1] is Some((j, val)) if the i'th writer variant
631                // _matches_ the j'th reader variant
632                // (i.e., it is the same primitive type, or the same kind of named type and has the same name, or a decimal with the same parameters)
633                // and successfully _resolves_ against it,
634                // and None otherwise.
635                //
636                // An example of types that match but don't resolve would be two records with the same name but incompatible fields.
637                let permutation = w_inner
638                    .variants()
639                    .iter()
640                    .map(|w_variant| {
641                        let (r_idx, r_variant) =
642                            r_inner.match_(w_variant, &w2r).ok_or_else(|| {
643                                SchemaResolutionError::new(format!(
644                                    "Failed to match writer union variant `{}` against any variant in the reader for field `{}`",
645                                    w_variant.get_human_name(writer.root),
646                                    self.get_current_human_readable_path()
647                                ))
648                            })?;
649                        let resolved =
650                            self.resolve(writer.step(w_variant), reader.step(r_variant))?;
651                        Ok((r_idx, resolved))
652                    })
653                    .collect();
654                let n_reader_variants = r_inner.variants().len();
655                let reader_null_variant = r_inner
656                    .variants()
657                    .iter()
658                    .position(|v| v == &SchemaPieceOrNamed::Piece(SchemaPiece::Null));
659                SchemaPieceOrNamed::Piece(SchemaPiece::ResolveUnionUnion {
660                    permutation,
661                    n_reader_variants,
662                    reader_null_variant,
663                })
664            }
665            // Writer is concrete; reader is union
666            (other, SchemaPieceRefOrNamed::Piece(SchemaPiece::Union(r_inner))) => {
667                let n_reader_variants = r_inner.variants().len();
668                let reader_null_variant = r_inner
669                    .variants()
670                    .iter()
671                    .position(|v| v == &SchemaPieceOrNamed::Piece(SchemaPiece::Null));
672                let (index, r_inner) = r_inner
673                    .match_ref(other, &self.writer_to_reader_names)
674                    .ok_or_else(|| {
675                        SchemaResolutionError::new(
676                            format!("No matching schema in reader union for writer type `{}` for field `{}`",
677                                    other.get_human_name(writer.root),
678                                    self.get_current_human_readable_path()))
679                    })?;
680                let inner = Box::new(self.resolve(writer.step_ref(other), reader.step(r_inner))?);
681                SchemaPieceOrNamed::Piece(SchemaPiece::ResolveConcreteUnion {
682                    index,
683                    inner,
684                    n_reader_variants,
685                    reader_null_variant,
686                })
687            }
688            // Writer is union; reader is concrete
689            (SchemaPieceRefOrNamed::Piece(SchemaPiece::Union(w_inner)), other) => {
690                let (index, w_inner) = w_inner
691                    .match_ref(other, &self.reader_to_writer_names)
692                    .ok_or_else(|| {
693                        SchemaResolutionError::new(
694                            format!("No matching schema in writer union for reader type `{}` for field `{}`",
695                                    other.get_human_name(writer.root),
696                                    self.get_current_human_readable_path()))
697                    })?;
698                let inner = Box::new(self.resolve(writer.step(w_inner), reader.step_ref(other))?);
699                SchemaPieceOrNamed::Piece(SchemaPiece::ResolveUnionConcrete { index, inner })
700            }
701            // Any other anonymous type.
702            (SchemaPieceRefOrNamed::Piece(wp), SchemaPieceRefOrNamed::Piece(rp)) => {
703                match (wp, rp) {
704                    // Normally for types that are underlyingly "long", we just interpret them according to the reader schema.
705                    // In this special case, it is better to interpret them according to the _writer_ schema:
706                    // By treating the written value as millis, we will decode the same DateTime values as were written.
707                    //
708                    // For example: if a writer wrote milliseconds and a reader tries to read it as microseconds,
709                    // it will be off by a factor of 1000 from the timestamp that the writer was intending to write
710                    (SchemaPiece::TimestampMilli, SchemaPiece::TimestampMicro) => {
711                        SchemaPieceOrNamed::Piece(SchemaPiece::TimestampMilli)
712                    }
713                    // See above
714                    (SchemaPiece::TimestampMicro, SchemaPiece::TimestampMilli) => {
715                        SchemaPieceOrNamed::Piece(SchemaPiece::TimestampMicro)
716                    }
717                    (SchemaPiece::Date, SchemaPiece::TimestampMilli)
718                    | (SchemaPiece::Date, SchemaPiece::TimestampMicro) => {
719                        SchemaPieceOrNamed::Piece(SchemaPiece::ResolveDateTimestamp)
720                    }
721                    (wp, rp) if wp.is_underlying_int() && rp.is_underlying_int() => {
722                        SchemaPieceOrNamed::Piece(rp.clone()) // This clone is just a copy - none of the underlying int/long types own heap memory.
723                    }
724                    (wp, rp) if wp.is_underlying_long() && rp.is_underlying_long() => {
725                        SchemaPieceOrNamed::Piece(rp.clone()) // see above comment
726                    }
727                    (wp, SchemaPiece::TimestampMilli) if wp.is_underlying_int() => {
728                        SchemaPieceOrNamed::Piece(SchemaPiece::ResolveIntTsMilli)
729                    }
730                    (wp, SchemaPiece::TimestampMicro) if wp.is_underlying_int() => {
731                        SchemaPieceOrNamed::Piece(SchemaPiece::ResolveIntTsMicro)
732                    }
733                    (SchemaPiece::Null, SchemaPiece::Null) => {
734                        SchemaPieceOrNamed::Piece(SchemaPiece::Null)
735                    }
736                    (SchemaPiece::Boolean, SchemaPiece::Boolean) => {
737                        SchemaPieceOrNamed::Piece(SchemaPiece::Boolean)
738                    }
739                    (SchemaPiece::Int, SchemaPiece::Long) => {
740                        SchemaPieceOrNamed::Piece(SchemaPiece::ResolveIntLong)
741                    }
742                    (SchemaPiece::Int, SchemaPiece::Float) => {
743                        SchemaPieceOrNamed::Piece(SchemaPiece::ResolveIntFloat)
744                    }
745                    (SchemaPiece::Int, SchemaPiece::Double) => {
746                        SchemaPieceOrNamed::Piece(SchemaPiece::ResolveIntDouble)
747                    }
748                    (SchemaPiece::Long, SchemaPiece::Float) => {
749                        SchemaPieceOrNamed::Piece(SchemaPiece::ResolveLongFloat)
750                    }
751                    (SchemaPiece::Long, SchemaPiece::Double) => {
752                        SchemaPieceOrNamed::Piece(SchemaPiece::ResolveLongDouble)
753                    }
754                    (SchemaPiece::Float, SchemaPiece::Float) => {
755                        SchemaPieceOrNamed::Piece(SchemaPiece::Float)
756                    }
757                    (SchemaPiece::Float, SchemaPiece::Double) => {
758                        SchemaPieceOrNamed::Piece(SchemaPiece::ResolveFloatDouble)
759                    }
760                    (SchemaPiece::Double, SchemaPiece::Double) => {
761                        SchemaPieceOrNamed::Piece(SchemaPiece::Double)
762                    }
763                    (b, SchemaPiece::Bytes)
764                        if b == &SchemaPiece::Bytes || b == &SchemaPiece::String =>
765                    {
766                        SchemaPieceOrNamed::Piece(SchemaPiece::Bytes)
767                    }
768                    (s, SchemaPiece::String)
769                        if s == &SchemaPiece::String || s == &SchemaPiece::Bytes =>
770                    {
771                        SchemaPieceOrNamed::Piece(SchemaPiece::String)
772                    }
773                    (SchemaPiece::Array(w_inner), SchemaPiece::Array(r_inner)) => {
774                        let inner =
775                            self.resolve(writer.step(&**w_inner), reader.step(&**r_inner))?;
776                        SchemaPieceOrNamed::Piece(SchemaPiece::Array(Box::new(inner)))
777                    }
778                    (SchemaPiece::Map(w_inner), SchemaPiece::Map(r_inner)) => {
779                        let inner =
780                            self.resolve(writer.step(&**w_inner), reader.step(&**r_inner))?;
781                        SchemaPieceOrNamed::Piece(SchemaPiece::Map(Box::new(inner)))
782                    }
783                    (
784                        SchemaPiece::Decimal {
785                            precision: wp,
786                            scale: ws,
787                            fixed_size: wf,
788                        },
789                        SchemaPiece::Decimal {
790                            precision: rp,
791                            scale: rs,
792                            fixed_size: rf,
793                        },
794                    ) => {
795                        if wp == rp && ws == rs && wf == rf {
796                            SchemaPieceOrNamed::Piece(SchemaPiece::Decimal {
797                                precision: *wp,
798                                scale: *ws,
799                                fixed_size: *wf,
800                            })
801                        } else {
802                            return Err(SchemaResolutionError::new(format!(
803                                "Decimal types must match in precision, scale, and fixed size. \
804                                Got ({:?}, {:?}, {:?}); ({:?}, {:?}. {:?}) for field `{}`",
805                                wp,
806                                ws,
807                                wf,
808                                rp,
809                                rs,
810                                rf,
811                                self.get_current_human_readable_path(),
812                            ))
813                            .into());
814                        }
815                    }
816                    (SchemaPiece::Decimal { fixed_size, .. }, SchemaPiece::Bytes)
817                        if *fixed_size == None =>
818                    {
819                        SchemaPieceOrNamed::Piece(SchemaPiece::Bytes)
820                    }
821                    // TODO [btv] We probably want to rethink what we're doing here, rather than just add
822                    // a new branch for every possible "logical" type. Perhaps logical types with the
823                    // same underlying type should always be resolvable to the reader schema's type?
824                    (SchemaPiece::Json, SchemaPiece::Json) => {
825                        SchemaPieceOrNamed::Piece(SchemaPiece::Json)
826                    }
827                    (SchemaPiece::Uuid, SchemaPiece::Uuid) => {
828                        SchemaPieceOrNamed::Piece(SchemaPiece::Uuid)
829                    }
830                    (
831                        SchemaPiece::Bytes,
832                        SchemaPiece::Decimal {
833                            precision,
834                            scale,
835                            fixed_size,
836                        },
837                    ) if *fixed_size == None => SchemaPieceOrNamed::Piece(SchemaPiece::Decimal {
838                        precision: *precision,
839                        scale: *scale,
840                        fixed_size: *fixed_size,
841                    }),
842                    (ws, rs) => {
843                        return Err(SchemaResolutionError::new(format!(
844                            "Writer schema has type `{:?}`, but reader schema has type `{:?}` for field `{}`",
845                            ws,
846                            rs,
847                            self.get_current_human_readable_path(),
848                        ))
849                        .into());
850                    }
851                }
852            }
853            // Named types
854            (SchemaPieceRefOrNamed::Named(w_index), SchemaPieceRefOrNamed::Named(r_index)) => {
855                if self.writer_to_reader_names.get(&w_index) != Some(&r_index) {
856                    // The nodes in the two schemas have different names. Resolution fails.
857                    let (w_name, r_name) = (
858                        &writer.root.lookup(w_index).name,
859                        &reader.root.lookup(r_index).name,
860                    );
861                    return Err(SchemaResolutionError::new(format!("Attempted to resolve writer schema node named {} against reader schema node named {}", w_name, r_name)).into());
862                }
863                // Check if we have already resolved the name previously, and if so, return a reference to
864                // it (in the new schema's namespace).
865                let idx = match self.reader_to_resolved_names.get(&r_index) {
866                    Some(resolved) => *resolved,
867                    None => {
868                        // We have not resolved this name yet; do so, and record it in the set of named schemas.
869                        // We need to push a placeholder beforehand, because schemas can be recursive;
870                        // a schema nested under this one may reference it.
871                        // A plausible example: {"type": "record", "name": "linked_list", "fields": [{"name": "next", "type": ["null", "linked_list"]}]}
872                        // Thus, `self.reader_to_resolved_names` needs to be correct for this node's index *before* we traverse the nodes under it.
873                        let resolved_idx = self.named.len();
874                        self.reader_to_resolved_names.insert(r_index, resolved_idx);
875                        self.named.push(None);
876                        let piece =
877                            match self.resolve_named(writer.root, reader.root, w_index, r_index) {
878                                Ok(piece) => piece,
879                                Err(e) => {
880                                    // clean up the placeholder values that were added above.
881                                    self.named.pop();
882                                    self.reader_to_resolved_names.remove(&r_index);
883                                    return Err(e);
884                                }
885                            };
886                        let name = &self.reader_schema.named[r_index].name;
887                        let ns = NamedSchemaPiece {
888                            name: name.clone(),
889                            piece,
890                        };
891                        self.named[resolved_idx] = Some(ns);
892                        self.indices.insert(name.clone(), resolved_idx);
893
894                        resolved_idx
895                    }
896                };
897                SchemaPieceOrNamed::Named(idx)
898            }
899            (ws, rs) => {
900                return Err(SchemaResolutionError::new(format!(
901                    "Schemas don't match: {:?}, {:?} for field `{}`",
902                    ws.get_piece_and_name(&writer.root).0,
903                    rs.get_piece_and_name(&reader.root).0,
904                    self.get_current_human_readable_path(),
905                ))
906                .into())
907            }
908        };
909        if named_node.is_some() {
910            self.human_readable_field_path.pop();
911            self.current_human_readable_path_start = previous_human_readable_path_start;
912        }
913        Ok(inner)
914    }
915
916    fn get_current_human_readable_path(&self) -> String {
917        return self.human_readable_field_path[self.current_human_readable_path_start..].join(".");
918    }
919}
920
921/// Decode a `Value` encoded in Avro format given its `Schema` and anything implementing `io::Read`
922/// to read from.
923///
924/// In case a reader `Schema` is provided, schema resolution will also be performed.
925///
926/// **NOTE** This function has a quite small niche of usage and does NOT take care of reading the
927/// header and consecutive data blocks; use [`Reader`](struct.Reader.html) if you don't know what
928/// you are doing, instead.
929pub fn from_avro_datum<R: AvroRead>(schema: &Schema, reader: &mut R) -> Result<Value, AvroError> {
930    let value = decode(schema.top_node(), reader)?;
931    Ok(value)
932}
933
934#[cfg(test)]
935mod tests {
936    use super::*;
937    use crate::types::{Record, ToAvro};
938    use crate::Reader;
939
940    use std::io::Cursor;
941
942    static SCHEMA: &str = r#"
943            {
944                "type": "record",
945                "name": "test",
946                "fields": [
947                    {"name": "a", "type": "long", "default": 42},
948                    {"name": "b", "type": "string"}
949                ]
950            }
951        "#;
952    static UNION_SCHEMA: &str = r#"
953            ["null", "long"]
954        "#;
955    static ENCODED: &[u8] = &[
956        79u8, 98u8, 106u8, 1u8, 4u8, 22u8, 97u8, 118u8, 114u8, 111u8, 46u8, 115u8, 99u8, 104u8,
957        101u8, 109u8, 97u8, 222u8, 1u8, 123u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8,
958        114u8, 101u8, 99u8, 111u8, 114u8, 100u8, 34u8, 44u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8,
959        58u8, 34u8, 116u8, 101u8, 115u8, 116u8, 34u8, 44u8, 34u8, 102u8, 105u8, 101u8, 108u8,
960        100u8, 115u8, 34u8, 58u8, 91u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8,
961        97u8, 34u8, 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 108u8, 111u8, 110u8,
962        103u8, 34u8, 44u8, 34u8, 100u8, 101u8, 102u8, 97u8, 117u8, 108u8, 116u8, 34u8, 58u8, 52u8,
963        50u8, 125u8, 44u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8, 98u8, 34u8,
964        44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 115u8, 116u8, 114u8, 105u8,
965        110u8, 103u8, 34u8, 125u8, 93u8, 125u8, 20u8, 97u8, 118u8, 114u8, 111u8, 46u8, 99u8, 111u8,
966        100u8, 101u8, 99u8, 8u8, 110u8, 117u8, 108u8, 108u8, 0u8, 94u8, 61u8, 54u8, 221u8, 190u8,
967        207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8, 4u8, 20u8, 54u8,
968        6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
969        207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
970    ];
971
972    #[test]
973    fn test_from_avro_datum() {
974        let schema: Schema = SCHEMA.parse().unwrap();
975        let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
976
977        let mut record = Record::new(schema.top_node()).unwrap();
978        record.put("a", 27i64);
979        record.put("b", "foo");
980        let expected = record.avro();
981
982        assert_eq!(from_avro_datum(&schema, &mut encoded).unwrap(), expected);
983    }
984
985    #[test]
986    fn test_null_union() {
987        let schema: Schema = UNION_SCHEMA.parse().unwrap();
988        let mut encoded: &'static [u8] = &[2, 0];
989
990        assert_eq!(
991            from_avro_datum(&schema, &mut encoded).unwrap(),
992            Value::Union {
993                index: 1,
994                inner: Box::new(Value::Long(0)),
995                n_variants: 2,
996                null_variant: Some(0)
997            }
998        );
999    }
1000
1001    #[test]
1002    fn test_reader_stream() {
1003        let schema: Schema = SCHEMA.parse().unwrap();
1004        let reader = Reader::with_schema(&schema, ENCODED).unwrap();
1005
1006        let mut record1 = Record::new(schema.top_node()).unwrap();
1007        record1.put("a", 27i64);
1008        record1.put("b", "foo");
1009
1010        let mut record2 = Record::new(schema.top_node()).unwrap();
1011        record2.put("a", 42i64);
1012        record2.put("b", "bar");
1013
1014        let expected = vec![record1.avro(), record2.avro()];
1015
1016        for (i, value) in reader.enumerate() {
1017            assert_eq!(value.unwrap(), expected[i]);
1018        }
1019    }
1020
1021    #[test]
1022    fn test_reader_invalid_header() {
1023        let schema: Schema = SCHEMA.parse().unwrap();
1024        let invalid = ENCODED.iter().skip(1).copied().collect::<Vec<u8>>();
1025        assert!(Reader::with_schema(&schema, &invalid[..]).is_err());
1026    }
1027
1028    #[test]
1029    fn test_reader_invalid_block() {
1030        let schema: Schema = SCHEMA.parse().unwrap();
1031        let invalid = ENCODED
1032            .iter()
1033            .rev()
1034            .skip(19)
1035            .copied()
1036            .collect::<Vec<u8>>()
1037            .into_iter()
1038            .rev()
1039            .collect::<Vec<u8>>();
1040        let reader = Reader::with_schema(&schema, &invalid[..]).unwrap();
1041        for value in reader {
1042            assert!(value.is_err());
1043        }
1044    }
1045
1046    #[test]
1047    fn test_reader_empty_buffer() {
1048        let empty = Cursor::new(Vec::new());
1049        assert!(Reader::new(empty).is_err());
1050    }
1051
1052    #[test]
1053    fn test_reader_only_header() {
1054        let invalid = ENCODED.iter().copied().take(165).collect::<Vec<u8>>();
1055        let reader = Reader::new(&invalid[..]).unwrap();
1056        for value in reader {
1057            assert!(value.is_err());
1058        }
1059    }
1060
1061    #[test]
1062    fn test_resolution_nested_types_error() {
1063        let r = r#"
1064{
1065    "type": "record",
1066    "name": "com.materialize.foo",
1067    "fields": [
1068        {"name": "f1", "type": {"type": "record", "name": "com.materialize.bar", "fields": [{"name": "f1_1", "type": "int"}]}}
1069    ]
1070}
1071"#;
1072        let w = r#"
1073{
1074    "type": "record",
1075    "name": "com.materialize.foo",
1076    "fields": [
1077        {"name": "f1", "type": {"type": "record", "name": "com.materialize.bar", "fields": [{"name": "f1_1", "type": "double"}]}}
1078    ]
1079}
1080"#;
1081        let r: Schema = r.parse().unwrap();
1082        let w: Schema = w.parse().unwrap();
1083        let err_str = if let Result::Err(AvroError::ResolveSchema(SchemaResolutionError(s))) =
1084            resolve_schemas(&w, &r)
1085        {
1086            s
1087        } else {
1088            panic!("Expected schema resolution failure");
1089        };
1090        // The field name here must NOT contain `com.materialize.foo`,
1091        // because explicitly named types are all relative to a global
1092        // namespace (i.e., they don't nest).
1093        assert_eq!(&err_str, "Writer schema has type `Double`, but reader schema has type `Int` for field `com.materialize.bar.f1_1`");
1094    }
1095
1096    #[test]
1097    fn test_extra_fields_without_default_error() {
1098        let r = r#"
1099{
1100    "type": "record",
1101    "name": "com.materialize.foo",
1102    "fields": [
1103        {"name": "f1", "type": "int"},
1104        {"name": "f2", "type": "int"}
1105    ]
1106}
1107"#;
1108        let w = r#"
1109{
1110    "type": "record",
1111    "name": "com.materialize.foo",
1112    "fields": [
1113        {"name": "f1", "type": "int"}
1114    ]
1115}
1116"#;
1117        let r: Schema = r.parse().unwrap();
1118        let w: Schema = w.parse().unwrap();
1119        let err_str = if let Result::Err(AvroError::ResolveSchema(SchemaResolutionError(s))) =
1120            resolve_schemas(&w, &r)
1121        {
1122            s
1123        } else {
1124            panic!("Expected schema resolution failure");
1125        };
1126        assert_eq!(
1127            &err_str,
1128            "Reader field `com.materialize.foo.f2` not found in writer, and has no default"
1129        );
1130    }
1131
1132    #[test]
1133    fn test_duplicate_field_error() {
1134        let r = r#"
1135{
1136    "type": "record",
1137    "name": "com.materialize.bar",
1138    "fields": [
1139        {"name": "f1", "type": "int"},
1140        {"name": "f1", "type": "int"}
1141    ]
1142}
1143"#;
1144        let w = r#"
1145{
1146    "type": "record",
1147    "name": "com.materialize.bar",
1148    "fields": [
1149        {"name": "f1", "type": "int"}
1150    ]
1151}
1152"#;
1153        let r: Schema = r.parse().unwrap();
1154        let w: Schema = w.parse().unwrap();
1155        let err_str = if let Result::Err(AvroError::ResolveSchema(SchemaResolutionError(s))) =
1156            resolve_schemas(&w, &r)
1157        {
1158            s
1159        } else {
1160            panic!("Expected schema resolution failure");
1161        };
1162        assert_eq!(
1163            &err_str,
1164            "Duplicate field `com.materialize.bar.f1` in schema"
1165        );
1166    }
1167
1168    #[test]
1169    fn test_decimal_field_mismatch_error() {
1170        let r = r#"
1171{
1172    "type": "record",
1173    "name": "com.materialize.foo",
1174    "fields": [
1175        {"name": "f1", "type": {"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 2}}
1176    ]
1177}
1178"#;
1179        let w = r#"
1180{
1181    "type": "record",
1182    "name": "com.materialize.foo",
1183    "fields": [
1184        {"name": "f1", "type": {"type": "bytes", "logicalType": "decimal", "precision": 5, "scale": 1}}
1185    ]
1186}
1187"#;
1188        let r: Schema = r.parse().unwrap();
1189        let w: Schema = w.parse().unwrap();
1190        let err_str = if let Result::Err(AvroError::ResolveSchema(SchemaResolutionError(s))) =
1191            resolve_schemas(&w, &r)
1192        {
1193            s
1194        } else {
1195            panic!("Expected schema resolution failure");
1196        };
1197        assert_eq!(
1198            &err_str,
1199            "Decimal types must match in precision, scale, and fixed size. Got (5, 1, None); (4, 2. None) for field `com.materialize.foo.f1`"
1200        );
1201    }
1202}