mz_avro/
error.rs

1// Copyright 2018 Flavien Raynaud.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License in the LICENSE file at the
7// root of this repository, or online at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// This file is derived from the avro-rs project, available at
18// https://github.com/flavray/avro-rs. It was incorporated
19// directly into Materialize on March 3, 2020.
20//
21// The original source code is subject to the terms of the MIT license, a copy
22// of which can be found in the LICENSE file at the root of this repository.
23
24use crate::types::ScalarKind;
25use crate::{util::TsUnit, ParseSchemaError, SchemaResolutionError};
26
27use fmt::{Debug, Display};
28use std::fmt;
29
30#[derive(Clone, Debug, Eq, PartialEq)]
31pub enum DecodeError {
32    CodecUtf8Error,
33    MapKeyUtf8Error,
34    StringUtf8Error,
35    UuidUtf8Error,
36    UnrecognizedCodec(String),
37    BadSnappyChecksum {
38        expected: u32,
39        actual: u32,
40    },
41    ExpectedNonnegInteger(i64),
42    BadTimestamp {
43        unit: TsUnit,
44        seconds: i64,
45        fraction: u32,
46    },
47    BadBoolean(u8),
48    BadDate(i32),
49    // The distinction between "bad" and "missing",
50    // for both unions and enums,
51    // is that a "bad" index was not found in the writer schema,
52    // meaning either the input is corrupt or there is a bug in this crate,
53    // whereas a "missing" index means the value was validly written,
54    // but can't be interpreted by the _reader_ schema
55    BadUnionIndex {
56        index: usize,
57        len: usize,
58    },
59    MissingUnionIndex(usize),
60    BadEnumIndex {
61        index: usize,
62        len: usize,
63    },
64    MissingEnumIndex {
65        index: usize,
66        symbol: String,
67    },
68    WrongUnionIndex {
69        expected: usize,
70        actual: usize,
71    },
72    UnexpectedRecord,
73    UnexpectedUnion,
74    UnexpectedArray,
75    UnexpectedMap,
76    UnexpectedEnum,
77    UnexpectedScalar,
78    UnexpectedDecimal,
79    UnexpectedBytes,
80    UnexpectedString,
81    UnexpectedJson,
82    UnexpectedUuid,
83    UnexpectedFixed,
84    UnexpectedScalarKind(ScalarKind),
85    WrongHeaderMagic([u8; 4]),
86    MissingAvroDotSchema,
87    I32OutOfRange(i64),
88    IntConversionError,
89    IntDecodeOverflow,
90    BadJson(serde_json::error::Category),
91    BadUuid(uuid::Error),
92    MismatchedBlockHeader {
93        expected: [u8; 16],
94        actual: [u8; 16],
95    },
96    Custom(String),
97}
98
99impl DecodeError {
100    fn fmt_inner(&self, f: &mut fmt::Formatter) -> fmt::Result {
101        match self {
102            DecodeError::UnrecognizedCodec(codec) => write!(f, "Unrecognized codec: {}", codec),
103            DecodeError::BadSnappyChecksum { expected, actual } => write!(
104                f,
105                "Bad Snappy CRC32; expected {:x} but got {:x}",
106                expected, actual
107            ),
108            DecodeError::ExpectedNonnegInteger(i) => {
109                write!(f, "Expected non-negative integer, got {}", i)
110            }
111            DecodeError::BadTimestamp {
112                unit,
113                seconds,
114                fraction,
115            } => write!(f, "Invalid {} timestamp {}.{}", unit, seconds, fraction),
116            DecodeError::BadBoolean(byte) => write!(f, "Invalid boolean: {:x}", byte),
117            DecodeError::BadDate(since_epoch) => {
118                write!(f, "Invalid num days since epoch: {}", since_epoch)
119            }
120            DecodeError::BadUnionIndex { index, len } => {
121                write!(f, "Union index out of bounds: {} (len: {})", index, len)
122            }
123            DecodeError::MissingUnionIndex(index) => {
124                write!(f, "Union variant not found in reader schema: {}", index)
125            }
126            DecodeError::BadEnumIndex { index, len } => write!(
127                f,
128                "Enum symbol index out of bounds: {} (len: {})",
129                index, len
130            ),
131            DecodeError::MissingEnumIndex { index, symbol } => write!(
132                f,
133                "Enum symbol {} at index {} in writer schema not found in reader",
134                symbol, index
135            ),
136            DecodeError::UnexpectedRecord => write!(f, "Unexpected record"),
137            DecodeError::UnexpectedUnion => write!(f, "Unexpected union"),
138            DecodeError::UnexpectedArray => write!(f, "Unexpected array"),
139            DecodeError::UnexpectedMap => write!(f, "Unexpected map"),
140            DecodeError::UnexpectedEnum => write!(f, "Unexpected enum"),
141            DecodeError::UnexpectedScalar => write!(f, "Unexpected scalar"),
142            DecodeError::UnexpectedDecimal => write!(f, "Unexpected decimal"),
143            DecodeError::UnexpectedBytes => write!(f, "Unexpected bytes"),
144            DecodeError::UnexpectedString => write!(f, "Unexpected string"),
145            DecodeError::UnexpectedJson => write!(f, "Unexpected json"),
146            DecodeError::UnexpectedUuid => write!(f, "Unexpected UUID"),
147            DecodeError::UnexpectedFixed => write!(f, "Unexpected fixed"),
148            DecodeError::UnexpectedScalarKind(kind) => {
149                write!(f, "Scalar of unexpected kind: {:?}", kind)
150            }
151            DecodeError::WrongHeaderMagic(magic) => write!(f, "Wrong header magic: {:x?}", magic),
152            DecodeError::MissingAvroDotSchema => write!(
153                f,
154                "Symbol's value as variable is void: avro.schema missing from header"
155            ),
156            DecodeError::I32OutOfRange(i) => write!(f, "Expected i32, got: {}", i),
157            DecodeError::IntDecodeOverflow => write!(f, "Overflow when decoding integer value"),
158            DecodeError::WrongUnionIndex { expected, actual } => write!(
159                f,
160                "Reader expected variant at index {}, got {}",
161                expected, actual
162            ),
163            DecodeError::Custom(inner) => write!(f, "Error in decode client: {}", inner),
164            DecodeError::CodecUtf8Error => write!(f, "Codec was not valid UTF-8"),
165            DecodeError::MapKeyUtf8Error => write!(f, "Map key was not valid UTF-8"),
166            DecodeError::StringUtf8Error => write!(f, "String was not valid UTF-8"),
167            DecodeError::UuidUtf8Error => write!(f, "UUID was not valid UTF-8"),
168            DecodeError::IntConversionError => write!(f, "Integer conversion failed"),
169            DecodeError::BadJson(inner_kind) => write!(f, "Json decoding failed: {:?}", inner_kind),
170            DecodeError::BadUuid(inner) => write!(f, "UUID decoding failed: {}", inner),
171            DecodeError::MismatchedBlockHeader { expected, actual } => write!(
172                f,
173                "Block marker ({:x?}) does not match header marker ({:x?})",
174                actual, expected
175            ),
176        }
177    }
178}
179
180impl Display for DecodeError {
181    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
182        write!(f, "Decoding error: ")?;
183        self.fmt_inner(f)
184    }
185}
186
187#[derive(Clone, Debug, Eq, PartialEq)]
188// TODO (btv) - some context (where in the record the error occurred) would be nice.
189// We can probably get this from the schema; we would just need to pipe it through a ton of places.
190pub enum Error {
191    Decode(DecodeError),
192    ParseSchema(ParseSchemaError), // TODO (btv) - make this a typed enum, like we did for DecodeError.
193    ResolveSchema(SchemaResolutionError), // TODO (btv) - idem.
194    IO(std::io::ErrorKind),        // Keeping the full error would be nicer, but that isn't `Clone`.
195    Allocation { attempted: usize, allowed: usize },
196}
197
198impl From<std::io::Error> for Error {
199    fn from(e: std::io::Error) -> Self {
200        Self::IO(e.kind())
201    }
202}
203
204impl From<std::convert::Infallible> for Error {
205    fn from(_: std::convert::Infallible) -> Self {
206        unreachable!()
207    }
208}
209
210impl From<std::num::TryFromIntError> for Error {
211    fn from(_: std::num::TryFromIntError) -> Self {
212        Self::Decode(DecodeError::IntConversionError)
213    }
214}
215
216impl From<DecodeError> for Error {
217    fn from(inner: DecodeError) -> Self {
218        Self::Decode(inner)
219    }
220}
221
222impl From<ParseSchemaError> for Error {
223    fn from(inner: ParseSchemaError) -> Self {
224        Self::ParseSchema(inner)
225    }
226}
227
228impl From<SchemaResolutionError> for Error {
229    fn from(inner: SchemaResolutionError) -> Self {
230        Self::ResolveSchema(inner)
231    }
232}
233
234impl Display for Error {
235    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
236        match self {
237            Error::Decode(inner) => write!(f, "Decode error: {}", inner),
238            Error::ParseSchema(inner) => write!(f, "Schema parse error: {}", inner),
239            Error::IO(inner_kind) => write!(f, "IO error: {:?}", inner_kind),
240            Error::Allocation { attempted, allowed } => write!(
241                f,
242                "Allocation error: attempt to allocate {} bytes (maximum allowed: {})",
243                attempted, allowed
244            ),
245            Error::ResolveSchema(inner) => write!(f, "Schema resolution error: {}", inner),
246        }
247    }
248}
249
250impl std::error::Error for Error {}