apache_avro/
types.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic handling the intermediate representation of Avro values.
19use crate::{
20    bigdecimal::{deserialize_big_decimal, serialize_big_decimal},
21    decimal::Decimal,
22    duration::Duration,
23    schema::{
24        DecimalSchema, EnumSchema, FixedSchema, Name, Namespace, Precision, RecordField,
25        RecordSchema, ResolvedSchema, Scale, Schema, SchemaKind, UnionSchema,
26    },
27    AvroResult, Error,
28};
29use bigdecimal::BigDecimal;
30use log::{debug, error};
31use serde_json::{Number, Value as JsonValue};
32use std::{
33    borrow::Borrow,
34    collections::{BTreeMap, HashMap},
35    fmt::Debug,
36    hash::BuildHasher,
37    str::FromStr,
38};
39use uuid::Uuid;
40
41/// Compute the maximum decimal value precision of a byte array of length `len` could hold.
42fn max_prec_for_len(len: usize) -> Result<usize, Error> {
43    let len = i32::try_from(len).map_err(|e| Error::ConvertLengthToI32(e, len))?;
44    Ok((2.0_f64.powi(8 * len - 1) - 1.0).log10().floor() as usize)
45}
46
47/// A valid Avro value.
48///
49/// More information about Avro values can be found in the [Avro
50/// Specification](https://avro.apache.org/docs/current/specification/#schema-declaration)
51#[derive(Clone, Debug, PartialEq, strum_macros::EnumDiscriminants)]
52#[strum_discriminants(name(ValueKind))]
53pub enum Value {
54    /// A `null` Avro value.
55    Null,
56    /// A `boolean` Avro value.
57    Boolean(bool),
58    /// A `int` Avro value.
59    Int(i32),
60    /// A `long` Avro value.
61    Long(i64),
62    /// A `float` Avro value.
63    Float(f32),
64    /// A `double` Avro value.
65    Double(f64),
66    /// A `bytes` Avro value.
67    Bytes(Vec<u8>),
68    /// A `string` Avro value.
69    String(String),
70    /// A `fixed` Avro value.
71    /// The size of the fixed value is represented as a `usize`.
72    Fixed(usize, Vec<u8>),
73    /// An `enum` Avro value.
74    ///
75    /// An Enum is represented by a symbol and its position in the symbols list
76    /// of its corresponding schema.
77    /// This allows schema-less encoding, as well as schema resolution while
78    /// reading values.
79    Enum(u32, String),
80    /// An `union` Avro value.
81    ///
82    /// A Union is represented by the value it holds and its position in the type list
83    /// of its corresponding schema
84    /// This allows schema-less encoding, as well as schema resolution while
85    /// reading values.
86    Union(u32, Box<Value>),
87    /// An `array` Avro value.
88    Array(Vec<Value>),
89    /// A `map` Avro value.
90    Map(HashMap<String, Value>),
91    /// A `record` Avro value.
92    ///
93    /// A Record is represented by a vector of (`<record name>`, `value`).
94    /// This allows schema-less encoding.
95    ///
96    /// See [Record](types.Record) for a more user-friendly support.
97    Record(Vec<(String, Value)>),
98    /// A date value.
99    ///
100    /// Serialized and deserialized as `i32` directly. Can only be deserialized properly with a
101    /// schema.
102    Date(i32),
103    /// An Avro Decimal value. Bytes are in big-endian order, per the Avro spec.
104    Decimal(Decimal),
105    /// An Avro Decimal value.
106    BigDecimal(BigDecimal),
107    /// Time in milliseconds.
108    TimeMillis(i32),
109    /// Time in microseconds.
110    TimeMicros(i64),
111    /// Timestamp in milliseconds.
112    TimestampMillis(i64),
113    /// Timestamp in microseconds.
114    TimestampMicros(i64),
115    /// Timestamp in nanoseconds.
116    TimestampNanos(i64),
117    /// Local timestamp in milliseconds.
118    LocalTimestampMillis(i64),
119    /// Local timestamp in microseconds.
120    LocalTimestampMicros(i64),
121    /// Local timestamp in nanoseconds.
122    LocalTimestampNanos(i64),
123    /// Avro Duration. An amount of time defined by months, days and milliseconds.
124    Duration(Duration),
125    /// Universally unique identifier.
126    Uuid(Uuid),
127}
128
129/// Any structure implementing the [ToAvro](trait.ToAvro.html) trait will be usable
130/// from a [Writer](../writer/struct.Writer.html).
131#[deprecated(
132    since = "0.11.0",
133    note = "Please use Value::from, Into::into or value.into() instead"
134)]
135pub trait ToAvro {
136    /// Transforms this value into an Avro-compatible [Value](enum.Value.html).
137    fn avro(self) -> Value;
138}
139
140#[allow(deprecated)]
141impl<T: Into<Value>> ToAvro for T {
142    fn avro(self) -> Value {
143        self.into()
144    }
145}
146
147macro_rules! to_value(
148    ($type:ty, $variant_constructor:expr) => (
149        impl From<$type> for Value {
150            fn from(value: $type) -> Self {
151                $variant_constructor(value)
152            }
153        }
154    );
155);
156
157to_value!(bool, Value::Boolean);
158to_value!(i32, Value::Int);
159to_value!(i64, Value::Long);
160to_value!(f32, Value::Float);
161to_value!(f64, Value::Double);
162to_value!(String, Value::String);
163to_value!(Vec<u8>, Value::Bytes);
164to_value!(uuid::Uuid, Value::Uuid);
165to_value!(Decimal, Value::Decimal);
166to_value!(BigDecimal, Value::BigDecimal);
167to_value!(Duration, Value::Duration);
168
169impl From<()> for Value {
170    fn from(_: ()) -> Self {
171        Self::Null
172    }
173}
174
175impl From<usize> for Value {
176    fn from(value: usize) -> Self {
177        i64::try_from(value)
178            .expect("cannot convert usize to i64")
179            .into()
180    }
181}
182
183impl From<&str> for Value {
184    fn from(value: &str) -> Self {
185        Self::String(value.to_owned())
186    }
187}
188
189impl From<&[u8]> for Value {
190    fn from(value: &[u8]) -> Self {
191        Self::Bytes(value.to_owned())
192    }
193}
194
195impl<T> From<Option<T>> for Value
196where
197    T: Into<Self>,
198{
199    fn from(value: Option<T>) -> Self {
200        // FIXME: this is incorrect in case first type in union is not "none"
201        Self::Union(
202            value.is_some() as u32,
203            Box::new(value.map_or_else(|| Self::Null, Into::into)),
204        )
205    }
206}
207
208impl<K, V, S> From<HashMap<K, V, S>> for Value
209where
210    K: Into<String>,
211    V: Into<Self>,
212    S: BuildHasher,
213{
214    fn from(value: HashMap<K, V, S>) -> Self {
215        Self::Map(
216            value
217                .into_iter()
218                .map(|(key, value)| (key.into(), value.into()))
219                .collect(),
220        )
221    }
222}
223
224/// Utility interface to build `Value::Record` objects.
225#[derive(Debug, Clone)]
226pub struct Record<'a> {
227    /// List of fields contained in the record.
228    /// Ordered according to the fields in the schema given to create this
229    /// `Record` object. Any unset field defaults to `Value::Null`.
230    pub fields: Vec<(String, Value)>,
231    schema_lookup: &'a BTreeMap<String, usize>,
232}
233
234impl Record<'_> {
235    /// Create a `Record` given a `Schema`.
236    ///
237    /// If the `Schema` is not a `Schema::Record` variant, `None` will be returned.
238    pub fn new(schema: &Schema) -> Option<Record> {
239        match *schema {
240            Schema::Record(RecordSchema {
241                fields: ref schema_fields,
242                lookup: ref schema_lookup,
243                ..
244            }) => {
245                let mut fields = Vec::with_capacity(schema_fields.len());
246                for schema_field in schema_fields.iter() {
247                    fields.push((schema_field.name.clone(), Value::Null));
248                }
249
250                Some(Record {
251                    fields,
252                    schema_lookup,
253                })
254            }
255            _ => None,
256        }
257    }
258
259    /// Put a compatible value (implementing the `ToAvro` trait) in the
260    /// `Record` for a given `field` name.
261    ///
262    /// **NOTE** Only ensure that the field name is present in the `Schema` given when creating
263    /// this `Record`. Does not perform any schema validation.
264    pub fn put<V>(&mut self, field: &str, value: V)
265    where
266        V: Into<Value>,
267    {
268        if let Some(&position) = self.schema_lookup.get(field) {
269            self.fields[position].1 = value.into()
270        }
271    }
272
273    /// Get the value for a given field name.
274    /// Returns `None` if the field is not present in the schema
275    pub fn get(&self, field: &str) -> Option<&Value> {
276        self.schema_lookup
277            .get(field)
278            .map(|&position| &self.fields[position].1)
279    }
280}
281
282impl<'a> From<Record<'a>> for Value {
283    fn from(value: Record<'a>) -> Self {
284        Self::Record(value.fields)
285    }
286}
287
288impl From<JsonValue> for Value {
289    fn from(value: JsonValue) -> Self {
290        match value {
291            JsonValue::Null => Self::Null,
292            JsonValue::Bool(b) => b.into(),
293            JsonValue::Number(ref n) if n.is_i64() => {
294                let n = n.as_i64().unwrap();
295                if n >= i32::MIN as i64 && n <= i32::MAX as i64 {
296                    Value::Int(n as i32)
297                } else {
298                    Value::Long(n)
299                }
300            }
301            JsonValue::Number(ref n) if n.is_f64() => Value::Double(n.as_f64().unwrap()),
302            JsonValue::Number(n) => Value::Long(n.as_u64().unwrap() as i64), // TODO: Not so great
303            JsonValue::String(s) => s.into(),
304            JsonValue::Array(items) => Value::Array(items.into_iter().map(Value::from).collect()),
305            JsonValue::Object(items) => Value::Map(
306                items
307                    .into_iter()
308                    .map(|(key, value)| (key, value.into()))
309                    .collect(),
310            ),
311        }
312    }
313}
314
315/// Convert Avro values to Json values
316impl TryFrom<Value> for JsonValue {
317    type Error = crate::error::Error;
318    fn try_from(value: Value) -> AvroResult<Self> {
319        match value {
320            Value::Null => Ok(Self::Null),
321            Value::Boolean(b) => Ok(Self::Bool(b)),
322            Value::Int(i) => Ok(Self::Number(i.into())),
323            Value::Long(l) => Ok(Self::Number(l.into())),
324            Value::Float(f) => Number::from_f64(f.into())
325                .map(Self::Number)
326                .ok_or_else(|| Error::ConvertF64ToJson(f.into())),
327            Value::Double(d) => Number::from_f64(d)
328                .map(Self::Number)
329                .ok_or(Error::ConvertF64ToJson(d)),
330            Value::Bytes(bytes) => Ok(Self::Array(bytes.into_iter().map(|b| b.into()).collect())),
331            Value::String(s) => Ok(Self::String(s)),
332            Value::Fixed(_size, items) => {
333                Ok(Self::Array(items.into_iter().map(|v| v.into()).collect()))
334            }
335            Value::Enum(_i, s) => Ok(Self::String(s)),
336            Value::Union(_i, b) => Self::try_from(*b),
337            Value::Array(items) => items
338                .into_iter()
339                .map(Self::try_from)
340                .collect::<Result<Vec<_>, _>>()
341                .map(Self::Array),
342            Value::Map(items) => items
343                .into_iter()
344                .map(|(key, value)| Self::try_from(value).map(|v| (key, v)))
345                .collect::<Result<Vec<_>, _>>()
346                .map(|v| Self::Object(v.into_iter().collect())),
347            Value::Record(items) => items
348                .into_iter()
349                .map(|(key, value)| Self::try_from(value).map(|v| (key, v)))
350                .collect::<Result<Vec<_>, _>>()
351                .map(|v| Self::Object(v.into_iter().collect())),
352            Value::Date(d) => Ok(Self::Number(d.into())),
353            Value::Decimal(ref d) => <Vec<u8>>::try_from(d)
354                .map(|vec| Self::Array(vec.into_iter().map(|v| v.into()).collect())),
355            Value::BigDecimal(ref bg) => {
356                let vec1: Vec<u8> = serialize_big_decimal(bg)?;
357                Ok(Self::Array(vec1.into_iter().map(|b| b.into()).collect()))
358            }
359            Value::TimeMillis(t) => Ok(Self::Number(t.into())),
360            Value::TimeMicros(t) => Ok(Self::Number(t.into())),
361            Value::TimestampMillis(t) => Ok(Self::Number(t.into())),
362            Value::TimestampMicros(t) => Ok(Self::Number(t.into())),
363            Value::TimestampNanos(t) => Ok(Self::Number(t.into())),
364            Value::LocalTimestampMillis(t) => Ok(Self::Number(t.into())),
365            Value::LocalTimestampMicros(t) => Ok(Self::Number(t.into())),
366            Value::LocalTimestampNanos(t) => Ok(Self::Number(t.into())),
367            Value::Duration(d) => Ok(Self::Array(
368                <[u8; 12]>::from(d).iter().map(|&v| v.into()).collect(),
369            )),
370            Value::Uuid(uuid) => Ok(Self::String(uuid.as_hyphenated().to_string())),
371        }
372    }
373}
374
375impl Value {
376    /// Validate the value against the given [Schema](../schema/enum.Schema.html).
377    ///
378    /// See the [Avro specification](https://avro.apache.org/docs/current/specification)
379    /// for the full set of rules of schema validation.
380    pub fn validate(&self, schema: &Schema) -> bool {
381        self.validate_schemata(vec![schema])
382    }
383
384    pub fn validate_schemata(&self, schemata: Vec<&Schema>) -> bool {
385        let rs = ResolvedSchema::try_from(schemata.clone())
386            .expect("Schemata didn't successfully resolve");
387        let schemata_len = schemata.len();
388        schemata.iter().any(|schema| {
389            let enclosing_namespace = schema.namespace();
390
391            match self.validate_internal(schema, rs.get_names(), &enclosing_namespace) {
392                Some(reason) => {
393                    let log_message =
394                        format!("Invalid value: {self:?} for schema: {schema:?}. Reason: {reason}");
395                    if schemata_len == 1 {
396                        error!("{log_message}");
397                    } else {
398                        debug!("{log_message}");
399                    };
400                    false
401                }
402                None => true,
403            }
404        })
405    }
406
407    fn accumulate(accumulator: Option<String>, other: Option<String>) -> Option<String> {
408        match (accumulator, other) {
409            (None, None) => None,
410            (None, s @ Some(_)) => s,
411            (s @ Some(_), None) => s,
412            (Some(reason1), Some(reason2)) => Some(format!("{reason1}\n{reason2}")),
413        }
414    }
415
416    /// Validates the value against the provided schema.
417    pub(crate) fn validate_internal<S: std::borrow::Borrow<Schema> + Debug>(
418        &self,
419        schema: &Schema,
420        names: &HashMap<Name, S>,
421        enclosing_namespace: &Namespace,
422    ) -> Option<String> {
423        match (self, schema) {
424            (_, Schema::Ref { name }) => {
425                let name = name.fully_qualified_name(enclosing_namespace);
426                names.get(&name).map_or_else(
427                    || {
428                        Some(format!(
429                            "Unresolved schema reference: '{:?}'. Parsed names: {:?}",
430                            name,
431                            names.keys()
432                        ))
433                    },
434                    |s| self.validate_internal(s.borrow(), names, &name.namespace),
435                )
436            }
437            (&Value::Null, &Schema::Null) => None,
438            (&Value::Boolean(_), &Schema::Boolean) => None,
439            (&Value::Int(_), &Schema::Int) => None,
440            (&Value::Int(_), &Schema::Date) => None,
441            (&Value::Int(_), &Schema::TimeMillis) => None,
442            (&Value::Int(_), &Schema::Long) => None,
443            (&Value::Long(_), &Schema::Long) => None,
444            (&Value::Long(_), &Schema::TimeMicros) => None,
445            (&Value::Long(_), &Schema::TimestampMillis) => None,
446            (&Value::Long(_), &Schema::TimestampMicros) => None,
447            (&Value::Long(_), &Schema::LocalTimestampMillis) => None,
448            (&Value::Long(_), &Schema::LocalTimestampMicros) => None,
449            (&Value::TimestampMicros(_), &Schema::TimestampMicros) => None,
450            (&Value::TimestampMillis(_), &Schema::TimestampMillis) => None,
451            (&Value::TimestampNanos(_), &Schema::TimestampNanos) => None,
452            (&Value::LocalTimestampMicros(_), &Schema::LocalTimestampMicros) => None,
453            (&Value::LocalTimestampMillis(_), &Schema::LocalTimestampMillis) => None,
454            (&Value::LocalTimestampNanos(_), &Schema::LocalTimestampNanos) => None,
455            (&Value::TimeMicros(_), &Schema::TimeMicros) => None,
456            (&Value::TimeMillis(_), &Schema::TimeMillis) => None,
457            (&Value::Date(_), &Schema::Date) => None,
458            (&Value::Decimal(_), &Schema::Decimal { .. }) => None,
459            (&Value::BigDecimal(_), &Schema::BigDecimal) => None,
460            (&Value::Duration(_), &Schema::Duration) => None,
461            (&Value::Uuid(_), &Schema::Uuid) => None,
462            (&Value::Float(_), &Schema::Float) => None,
463            (&Value::Float(_), &Schema::Double) => None,
464            (&Value::Double(_), &Schema::Double) => None,
465            (&Value::Bytes(_), &Schema::Bytes) => None,
466            (&Value::Bytes(_), &Schema::Decimal { .. }) => None,
467            (&Value::String(_), &Schema::String) => None,
468            (&Value::String(_), &Schema::Uuid) => None,
469            (&Value::Fixed(n, _), &Schema::Fixed(FixedSchema { size, .. })) => {
470                if n != size {
471                    Some(format!(
472                        "The value's size ({n}) is different than the schema's size ({size})"
473                    ))
474                } else {
475                    None
476                }
477            }
478            (Value::Bytes(b), &Schema::Fixed(FixedSchema { size, .. })) => {
479                if b.len() != size {
480                    Some(format!(
481                        "The bytes' length ({}) is different than the schema's size ({})",
482                        b.len(),
483                        size
484                    ))
485                } else {
486                    None
487                }
488            }
489            (&Value::Fixed(n, _), &Schema::Duration) => {
490                if n != 12 {
491                    Some(format!(
492                        "The value's size ('{n}') must be exactly 12 to be a Duration"
493                    ))
494                } else {
495                    None
496                }
497            }
498            // TODO: check precision against n
499            (&Value::Fixed(_n, _), &Schema::Decimal { .. }) => None,
500            (Value::String(s), Schema::Enum(EnumSchema { symbols, .. })) => {
501                if !symbols.contains(s) {
502                    Some(format!("'{s}' is not a member of the possible symbols"))
503                } else {
504                    None
505                }
506            }
507            (
508                &Value::Enum(i, ref s),
509                Schema::Enum(EnumSchema {
510                    symbols, default, ..
511                }),
512            ) => symbols
513                .get(i as usize)
514                .map(|ref symbol| {
515                    if symbol != &s {
516                        Some(format!("Symbol '{s}' is not at position '{i}'"))
517                    } else {
518                        None
519                    }
520                })
521                .unwrap_or_else(|| match default {
522                    Some(_) => None,
523                    None => Some(format!("No symbol at position '{i}'")),
524                }),
525            // (&Value::Union(None), &Schema::Union(_)) => None,
526            (&Value::Union(i, ref value), Schema::Union(inner)) => inner
527                .variants()
528                .get(i as usize)
529                .map(|schema| value.validate_internal(schema, names, enclosing_namespace))
530                .unwrap_or_else(|| Some(format!("No schema in the union at position '{i}'"))),
531            (v, Schema::Union(inner)) => {
532                match inner.find_schema_with_known_schemata(v, Some(names), enclosing_namespace) {
533                    Some(_) => None,
534                    None => Some("Could not find matching type in union".to_string()),
535                }
536            }
537            (Value::Array(items), Schema::Array(inner)) => items.iter().fold(None, |acc, item| {
538                Value::accumulate(
539                    acc,
540                    item.validate_internal(&inner.items, names, enclosing_namespace),
541                )
542            }),
543            (Value::Map(items), Schema::Map(inner)) => {
544                items.iter().fold(None, |acc, (_, value)| {
545                    Value::accumulate(
546                        acc,
547                        value.validate_internal(&inner.types, names, enclosing_namespace),
548                    )
549                })
550            }
551            (
552                Value::Record(record_fields),
553                Schema::Record(RecordSchema {
554                    fields,
555                    lookup,
556                    name,
557                    ..
558                }),
559            ) => {
560                let non_nullable_fields_count =
561                    fields.iter().filter(|&rf| !rf.is_nullable()).count();
562
563                // If the record contains fewer fields as required fields by the schema, it is invalid.
564                if record_fields.len() < non_nullable_fields_count {
565                    return Some(format!(
566                        "The value's records length ({}) doesn't match the schema ({} non-nullable fields)",
567                        record_fields.len(),
568                        non_nullable_fields_count
569                    ));
570                } else if record_fields.len() > fields.len() {
571                    return Some(format!(
572                        "The value's records length ({}) is greater than the schema's ({} fields)",
573                        record_fields.len(),
574                        fields.len(),
575                    ));
576                }
577
578                record_fields
579                    .iter()
580                    .fold(None, |acc, (field_name, record_field)| {
581                        let record_namespace = if name.namespace.is_none() {
582                            enclosing_namespace
583                        } else {
584                            &name.namespace
585                        };
586                        match lookup.get(field_name) {
587                            Some(idx) => {
588                                let field = &fields[*idx];
589                                Value::accumulate(
590                                    acc,
591                                    record_field.validate_internal(
592                                        &field.schema,
593                                        names,
594                                        record_namespace,
595                                    ),
596                                )
597                            }
598                            None => Value::accumulate(
599                                acc,
600                                Some(format!("There is no schema field for field '{field_name}'")),
601                            ),
602                        }
603                    })
604            }
605            (Value::Map(items), Schema::Record(RecordSchema { fields, .. })) => {
606                fields.iter().fold(None, |acc, field| {
607                    if let Some(item) = items.get(&field.name) {
608                        let res = item.validate_internal(&field.schema, names, enclosing_namespace);
609                        Value::accumulate(acc, res)
610                    } else if !field.is_nullable() {
611                        Value::accumulate(
612                            acc,
613                            Some(format!(
614                                "Field with name '{:?}' is not a member of the map items",
615                                field.name
616                            )),
617                        )
618                    } else {
619                        acc
620                    }
621                })
622            }
623            (v, s) => Some(format!(
624                "Unsupported value-schema combination! Value: {v:?}, schema: {s:?}"
625            )),
626        }
627    }
628
629    /// Attempt to perform schema resolution on the value, with the given
630    /// [Schema](../schema/enum.Schema.html).
631    ///
632    /// See [Schema Resolution](https://avro.apache.org/docs/current/specification/#schema-resolution)
633    /// in the Avro specification for the full set of rules of schema
634    /// resolution.
635    pub fn resolve(self, schema: &Schema) -> AvroResult<Self> {
636        self.resolve_schemata(schema, Vec::with_capacity(0))
637    }
638
639    /// Attempt to perform schema resolution on the value, with the given
640    /// [Schema](../schema/enum.Schema.html) and set of schemas to use for Refs resolution.
641    ///
642    /// See [Schema Resolution](https://avro.apache.org/docs/current/specification/#schema-resolution)
643    /// in the Avro specification for the full set of rules of schema
644    /// resolution.
645    pub fn resolve_schemata(self, schema: &Schema, schemata: Vec<&Schema>) -> AvroResult<Self> {
646        let enclosing_namespace = schema.namespace();
647        let rs = if schemata.is_empty() {
648            ResolvedSchema::try_from(schema)?
649        } else {
650            ResolvedSchema::try_from(schemata)?
651        };
652        self.resolve_internal(schema, rs.get_names(), &enclosing_namespace, &None)
653    }
654
655    pub(crate) fn resolve_internal<S: Borrow<Schema> + Debug>(
656        mut self,
657        schema: &Schema,
658        names: &HashMap<Name, S>,
659        enclosing_namespace: &Namespace,
660        field_default: &Option<JsonValue>,
661    ) -> AvroResult<Self> {
662        // Check if this schema is a union, and if the reader schema is not.
663        if SchemaKind::from(&self) == SchemaKind::Union
664            && SchemaKind::from(schema) != SchemaKind::Union
665        {
666            // Pull out the Union, and attempt to resolve against it.
667            let v = match self {
668                Value::Union(_i, b) => *b,
669                _ => unreachable!(),
670            };
671            self = v;
672        }
673        match *schema {
674            Schema::Ref { ref name } => {
675                let name = name.fully_qualified_name(enclosing_namespace);
676
677                if let Some(resolved) = names.get(&name) {
678                    debug!("Resolved {name:?}");
679                    self.resolve_internal(resolved.borrow(), names, &name.namespace, field_default)
680                } else {
681                    error!("Failed to resolve schema {name:?}");
682                    Err(Error::SchemaResolutionError(name.clone()))
683                }
684            }
685            Schema::Null => self.resolve_null(),
686            Schema::Boolean => self.resolve_boolean(),
687            Schema::Int => self.resolve_int(),
688            Schema::Long => self.resolve_long(),
689            Schema::Float => self.resolve_float(),
690            Schema::Double => self.resolve_double(),
691            Schema::Bytes => self.resolve_bytes(),
692            Schema::String => self.resolve_string(),
693            Schema::Fixed(FixedSchema { size, .. }) => self.resolve_fixed(size),
694            Schema::Union(ref inner) => {
695                self.resolve_union(inner, names, enclosing_namespace, field_default)
696            }
697            Schema::Enum(EnumSchema {
698                ref symbols,
699                ref default,
700                ..
701            }) => self.resolve_enum(symbols, default, field_default),
702            Schema::Array(ref inner) => {
703                self.resolve_array(&inner.items, names, enclosing_namespace)
704            }
705            Schema::Map(ref inner) => self.resolve_map(&inner.types, names, enclosing_namespace),
706            Schema::Record(RecordSchema { ref fields, .. }) => {
707                self.resolve_record(fields, names, enclosing_namespace)
708            }
709            Schema::Decimal(DecimalSchema {
710                scale,
711                precision,
712                ref inner,
713            }) => self.resolve_decimal(precision, scale, inner),
714            Schema::BigDecimal => self.resolve_bigdecimal(),
715            Schema::Date => self.resolve_date(),
716            Schema::TimeMillis => self.resolve_time_millis(),
717            Schema::TimeMicros => self.resolve_time_micros(),
718            Schema::TimestampMillis => self.resolve_timestamp_millis(),
719            Schema::TimestampMicros => self.resolve_timestamp_micros(),
720            Schema::TimestampNanos => self.resolve_timestamp_nanos(),
721            Schema::LocalTimestampMillis => self.resolve_local_timestamp_millis(),
722            Schema::LocalTimestampMicros => self.resolve_local_timestamp_micros(),
723            Schema::LocalTimestampNanos => self.resolve_local_timestamp_nanos(),
724            Schema::Duration => self.resolve_duration(),
725            Schema::Uuid => self.resolve_uuid(),
726        }
727    }
728
729    fn resolve_uuid(self) -> Result<Self, Error> {
730        Ok(match self {
731            uuid @ Value::Uuid(_) => uuid,
732            Value::String(ref string) => {
733                Value::Uuid(Uuid::from_str(string).map_err(Error::ConvertStrToUuid)?)
734            }
735            other => return Err(Error::GetUuid(other)),
736        })
737    }
738
739    fn resolve_bigdecimal(self) -> Result<Self, Error> {
740        Ok(match self {
741            bg @ Value::BigDecimal(_) => bg,
742            Value::Bytes(b) => Value::BigDecimal(deserialize_big_decimal(&b).unwrap()),
743            other => return Err(Error::GetBigDecimal(other)),
744        })
745    }
746
747    fn resolve_duration(self) -> Result<Self, Error> {
748        Ok(match self {
749            duration @ Value::Duration { .. } => duration,
750            Value::Fixed(size, bytes) => {
751                if size != 12 {
752                    return Err(Error::GetDecimalFixedBytes(size));
753                }
754                Value::Duration(Duration::from([
755                    bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
756                    bytes[8], bytes[9], bytes[10], bytes[11],
757                ]))
758            }
759            other => return Err(Error::ResolveDuration(other)),
760        })
761    }
762
763    fn resolve_decimal(
764        self,
765        precision: Precision,
766        scale: Scale,
767        inner: &Schema,
768    ) -> Result<Self, Error> {
769        if scale > precision {
770            return Err(Error::GetScaleAndPrecision { scale, precision });
771        }
772        match inner {
773            &Schema::Fixed(FixedSchema { size, .. }) => {
774                if max_prec_for_len(size)? < precision {
775                    return Err(Error::GetScaleWithFixedSize { size, precision });
776                }
777            }
778            Schema::Bytes => (),
779            _ => return Err(Error::ResolveDecimalSchema(inner.into())),
780        };
781        match self {
782            Value::Decimal(num) => {
783                let num_bytes = num.len();
784                if max_prec_for_len(num_bytes)? < precision {
785                    Err(Error::ComparePrecisionAndSize {
786                        precision,
787                        num_bytes,
788                    })
789                } else {
790                    Ok(Value::Decimal(num))
791                }
792                // check num.bits() here
793            }
794            Value::Fixed(_, bytes) | Value::Bytes(bytes) => {
795                if max_prec_for_len(bytes.len())? < precision {
796                    Err(Error::ComparePrecisionAndSize {
797                        precision,
798                        num_bytes: bytes.len(),
799                    })
800                } else {
801                    // precision and scale match, can we assume the underlying type can hold the data?
802                    Ok(Value::Decimal(Decimal::from(bytes)))
803                }
804            }
805            other => Err(Error::ResolveDecimal(other)),
806        }
807    }
808
809    fn resolve_date(self) -> Result<Self, Error> {
810        match self {
811            Value::Date(d) | Value::Int(d) => Ok(Value::Date(d)),
812            other => Err(Error::GetDate(other)),
813        }
814    }
815
816    fn resolve_time_millis(self) -> Result<Self, Error> {
817        match self {
818            Value::TimeMillis(t) | Value::Int(t) => Ok(Value::TimeMillis(t)),
819            other => Err(Error::GetTimeMillis(other)),
820        }
821    }
822
823    fn resolve_time_micros(self) -> Result<Self, Error> {
824        match self {
825            Value::TimeMicros(t) | Value::Long(t) => Ok(Value::TimeMicros(t)),
826            Value::Int(t) => Ok(Value::TimeMicros(i64::from(t))),
827            other => Err(Error::GetTimeMicros(other)),
828        }
829    }
830
831    fn resolve_timestamp_millis(self) -> Result<Self, Error> {
832        match self {
833            Value::TimestampMillis(ts) | Value::Long(ts) => Ok(Value::TimestampMillis(ts)),
834            Value::Int(ts) => Ok(Value::TimestampMillis(i64::from(ts))),
835            other => Err(Error::GetTimestampMillis(other)),
836        }
837    }
838
839    fn resolve_timestamp_micros(self) -> Result<Self, Error> {
840        match self {
841            Value::TimestampMicros(ts) | Value::Long(ts) => Ok(Value::TimestampMicros(ts)),
842            Value::Int(ts) => Ok(Value::TimestampMicros(i64::from(ts))),
843            other => Err(Error::GetTimestampMicros(other)),
844        }
845    }
846
847    fn resolve_timestamp_nanos(self) -> Result<Self, Error> {
848        match self {
849            Value::TimestampNanos(ts) | Value::Long(ts) => Ok(Value::TimestampNanos(ts)),
850            Value::Int(ts) => Ok(Value::TimestampNanos(i64::from(ts))),
851            other => Err(Error::GetTimestampNanos(other)),
852        }
853    }
854
855    fn resolve_local_timestamp_millis(self) -> Result<Self, Error> {
856        match self {
857            Value::LocalTimestampMillis(ts) | Value::Long(ts) => {
858                Ok(Value::LocalTimestampMillis(ts))
859            }
860            Value::Int(ts) => Ok(Value::LocalTimestampMillis(i64::from(ts))),
861            other => Err(Error::GetLocalTimestampMillis(other)),
862        }
863    }
864
865    fn resolve_local_timestamp_micros(self) -> Result<Self, Error> {
866        match self {
867            Value::LocalTimestampMicros(ts) | Value::Long(ts) => {
868                Ok(Value::LocalTimestampMicros(ts))
869            }
870            Value::Int(ts) => Ok(Value::LocalTimestampMicros(i64::from(ts))),
871            other => Err(Error::GetLocalTimestampMicros(other)),
872        }
873    }
874
875    fn resolve_local_timestamp_nanos(self) -> Result<Self, Error> {
876        match self {
877            Value::LocalTimestampNanos(ts) | Value::Long(ts) => Ok(Value::LocalTimestampNanos(ts)),
878            Value::Int(ts) => Ok(Value::LocalTimestampNanos(i64::from(ts))),
879            other => Err(Error::GetLocalTimestampNanos(other)),
880        }
881    }
882
883    fn resolve_null(self) -> Result<Self, Error> {
884        match self {
885            Value::Null => Ok(Value::Null),
886            other => Err(Error::GetNull(other)),
887        }
888    }
889
890    fn resolve_boolean(self) -> Result<Self, Error> {
891        match self {
892            Value::Boolean(b) => Ok(Value::Boolean(b)),
893            other => Err(Error::GetBoolean(other)),
894        }
895    }
896
897    fn resolve_int(self) -> Result<Self, Error> {
898        match self {
899            Value::Int(n) => Ok(Value::Int(n)),
900            Value::Long(n) => Ok(Value::Int(n as i32)),
901            other => Err(Error::GetInt(other)),
902        }
903    }
904
905    fn resolve_long(self) -> Result<Self, Error> {
906        match self {
907            Value::Int(n) => Ok(Value::Long(i64::from(n))),
908            Value::Long(n) => Ok(Value::Long(n)),
909            other => Err(Error::GetLong(other)),
910        }
911    }
912
913    fn resolve_float(self) -> Result<Self, Error> {
914        match self {
915            Value::Int(n) => Ok(Value::Float(n as f32)),
916            Value::Long(n) => Ok(Value::Float(n as f32)),
917            Value::Float(x) => Ok(Value::Float(x)),
918            Value::Double(x) => Ok(Value::Float(x as f32)),
919            Value::String(ref x) => match Self::parse_special_float(x) {
920                Some(f) => Ok(Value::Float(f)),
921                None => Err(Error::GetFloat(self)),
922            },
923            other => Err(Error::GetFloat(other)),
924        }
925    }
926
927    fn resolve_double(self) -> Result<Self, Error> {
928        match self {
929            Value::Int(n) => Ok(Value::Double(f64::from(n))),
930            Value::Long(n) => Ok(Value::Double(n as f64)),
931            Value::Float(x) => Ok(Value::Double(f64::from(x))),
932            Value::Double(x) => Ok(Value::Double(x)),
933            Value::String(ref x) => match Self::parse_special_float(x) {
934                Some(f) => Ok(Value::Double(f64::from(f))),
935                None => Err(Error::GetDouble(self)),
936            },
937            other => Err(Error::GetDouble(other)),
938        }
939    }
940
941    /// IEEE 754 NaN and infinities are not valid JSON numbers.
942    /// So they are represented in JSON as strings.
943    fn parse_special_float(value: &str) -> Option<f32> {
944        match value {
945            "NaN" => Some(f32::NAN),
946            "INF" | "Infinity" => Some(f32::INFINITY),
947            "-INF" | "-Infinity" => Some(f32::NEG_INFINITY),
948            _ => None,
949        }
950    }
951
952    fn resolve_bytes(self) -> Result<Self, Error> {
953        match self {
954            Value::Bytes(bytes) => Ok(Value::Bytes(bytes)),
955            Value::String(s) => Ok(Value::Bytes(s.into_bytes())),
956            Value::Array(items) => Ok(Value::Bytes(
957                items
958                    .into_iter()
959                    .map(Value::try_u8)
960                    .collect::<Result<Vec<_>, _>>()?,
961            )),
962            other => Err(Error::GetBytes(other)),
963        }
964    }
965
966    fn resolve_string(self) -> Result<Self, Error> {
967        match self {
968            Value::String(s) => Ok(Value::String(s)),
969            Value::Bytes(bytes) | Value::Fixed(_, bytes) => Ok(Value::String(
970                String::from_utf8(bytes).map_err(Error::ConvertToUtf8)?,
971            )),
972            other => Err(Error::GetString(other)),
973        }
974    }
975
976    fn resolve_fixed(self, size: usize) -> Result<Self, Error> {
977        match self {
978            Value::Fixed(n, bytes) => {
979                if n == size {
980                    Ok(Value::Fixed(n, bytes))
981                } else {
982                    Err(Error::CompareFixedSizes { size, n })
983                }
984            }
985            Value::String(s) => Ok(Value::Fixed(s.len(), s.into_bytes())),
986            Value::Bytes(s) => {
987                if s.len() == size {
988                    Ok(Value::Fixed(size, s))
989                } else {
990                    Err(Error::CompareFixedSizes { size, n: s.len() })
991                }
992            }
993            other => Err(Error::GetStringForFixed(other)),
994        }
995    }
996
997    pub(crate) fn resolve_enum(
998        self,
999        symbols: &[String],
1000        enum_default: &Option<String>,
1001        _field_default: &Option<JsonValue>,
1002    ) -> Result<Self, Error> {
1003        let validate_symbol = |symbol: String, symbols: &[String]| {
1004            if let Some(index) = symbols.iter().position(|item| item == &symbol) {
1005                Ok(Value::Enum(index as u32, symbol))
1006            } else {
1007                match enum_default {
1008                    Some(default) => {
1009                        if let Some(index) = symbols.iter().position(|item| item == default) {
1010                            Ok(Value::Enum(index as u32, default.clone()))
1011                        } else {
1012                            Err(Error::GetEnumDefault {
1013                                symbol,
1014                                symbols: symbols.into(),
1015                            })
1016                        }
1017                    }
1018                    _ => Err(Error::GetEnumDefault {
1019                        symbol,
1020                        symbols: symbols.into(),
1021                    }),
1022                }
1023            }
1024        };
1025
1026        match self {
1027            Value::Enum(_raw_index, s) => validate_symbol(s, symbols),
1028            Value::String(s) => validate_symbol(s, symbols),
1029            other => Err(Error::GetEnum(other)),
1030        }
1031    }
1032
1033    fn resolve_union<S: Borrow<Schema> + Debug>(
1034        self,
1035        schema: &UnionSchema,
1036        names: &HashMap<Name, S>,
1037        enclosing_namespace: &Namespace,
1038        field_default: &Option<JsonValue>,
1039    ) -> Result<Self, Error> {
1040        let v = match self {
1041            // Both are unions case.
1042            Value::Union(_i, v) => *v,
1043            // Reader is a union, but writer is not.
1044            v => v,
1045        };
1046        let (i, inner) = schema
1047            .find_schema_with_known_schemata(&v, Some(names), enclosing_namespace)
1048            .ok_or(Error::FindUnionVariant)?;
1049
1050        Ok(Value::Union(
1051            i as u32,
1052            Box::new(v.resolve_internal(inner, names, enclosing_namespace, field_default)?),
1053        ))
1054    }
1055
1056    fn resolve_array<S: Borrow<Schema> + Debug>(
1057        self,
1058        schema: &Schema,
1059        names: &HashMap<Name, S>,
1060        enclosing_namespace: &Namespace,
1061    ) -> Result<Self, Error> {
1062        match self {
1063            Value::Array(items) => Ok(Value::Array(
1064                items
1065                    .into_iter()
1066                    .map(|item| item.resolve_internal(schema, names, enclosing_namespace, &None))
1067                    .collect::<Result<_, _>>()?,
1068            )),
1069            other => Err(Error::GetArray {
1070                expected: schema.into(),
1071                other,
1072            }),
1073        }
1074    }
1075
1076    fn resolve_map<S: Borrow<Schema> + Debug>(
1077        self,
1078        schema: &Schema,
1079        names: &HashMap<Name, S>,
1080        enclosing_namespace: &Namespace,
1081    ) -> Result<Self, Error> {
1082        match self {
1083            Value::Map(items) => Ok(Value::Map(
1084                items
1085                    .into_iter()
1086                    .map(|(key, value)| {
1087                        value
1088                            .resolve_internal(schema, names, enclosing_namespace, &None)
1089                            .map(|value| (key, value))
1090                    })
1091                    .collect::<Result<_, _>>()?,
1092            )),
1093            other => Err(Error::GetMap {
1094                expected: schema.into(),
1095                other,
1096            }),
1097        }
1098    }
1099
1100    fn resolve_record<S: Borrow<Schema> + Debug>(
1101        self,
1102        fields: &[RecordField],
1103        names: &HashMap<Name, S>,
1104        enclosing_namespace: &Namespace,
1105    ) -> Result<Self, Error> {
1106        let mut items = match self {
1107            Value::Map(items) => Ok(items),
1108            Value::Record(fields) => Ok(fields.into_iter().collect::<HashMap<_, _>>()),
1109            other => Err(Error::GetRecord {
1110                expected: fields
1111                    .iter()
1112                    .map(|field| (field.name.clone(), field.schema.clone().into()))
1113                    .collect(),
1114                other,
1115            }),
1116        }?;
1117
1118        let new_fields = fields
1119            .iter()
1120            .map(|field| {
1121                let value = match items.remove(&field.name) {
1122                    Some(value) => value,
1123                    None => match field.default {
1124                        Some(ref value) => match field.schema {
1125                            Schema::Enum(EnumSchema {
1126                                ref symbols,
1127                                ref default,
1128                                ..
1129                            }) => Value::from(value.clone()).resolve_enum(
1130                                symbols,
1131                                default,
1132                                &field.default.clone(),
1133                            )?,
1134                            Schema::Union(ref union_schema) => {
1135                                let first = &union_schema.variants()[0];
1136                                // NOTE: this match exists only to optimize null defaults for large
1137                                // backward-compatible schemas with many nullable fields
1138                                match first {
1139                                    Schema::Null => Value::Union(0, Box::new(Value::Null)),
1140                                    _ => Value::Union(
1141                                        0,
1142                                        Box::new(Value::from(value.clone()).resolve_internal(
1143                                            first,
1144                                            names,
1145                                            enclosing_namespace,
1146                                            &field.default,
1147                                        )?),
1148                                    ),
1149                                }
1150                            }
1151                            _ => Value::from(value.clone()),
1152                        },
1153                        None => {
1154                            return Err(Error::GetField(field.name.clone()));
1155                        }
1156                    },
1157                };
1158                value
1159                    .resolve_internal(&field.schema, names, enclosing_namespace, &field.default)
1160                    .map(|value| (field.name.clone(), value))
1161            })
1162            .collect::<Result<Vec<_>, _>>()?;
1163
1164        Ok(Value::Record(new_fields))
1165    }
1166
1167    fn try_u8(self) -> AvroResult<u8> {
1168        let int = self.resolve(&Schema::Int)?;
1169        if let Value::Int(n) = int {
1170            if n >= 0 && n <= i32::from(u8::MAX) {
1171                return Ok(n as u8);
1172            }
1173        }
1174
1175        Err(Error::GetU8(int))
1176    }
1177}
1178
1179#[cfg(test)]
1180mod tests {
1181    use super::*;
1182    use crate::{
1183        duration::{Days, Millis, Months},
1184        schema::RecordFieldOrder,
1185    };
1186    use apache_avro_test_helper::{
1187        logger::{assert_logged, assert_not_logged},
1188        TestResult,
1189    };
1190    use num_bigint::BigInt;
1191    use pretty_assertions::assert_eq;
1192    use serde_json::json;
1193
1194    #[test]
1195    fn avro_3809_validate_nested_records_with_implicit_namespace() -> TestResult {
1196        let schema = Schema::parse_str(
1197            r#"{
1198            "name": "record_name",
1199            "namespace": "space",
1200            "type": "record",
1201            "fields": [
1202              {
1203                "name": "outer_field_1",
1204                "type": {
1205                  "type": "record",
1206                  "name": "middle_record_name",
1207                  "namespace": "middle_namespace",
1208                  "fields": [
1209                    {
1210                      "name": "middle_field_1",
1211                      "type": {
1212                        "type": "record",
1213                        "name": "inner_record_name",
1214                        "fields": [
1215                          { "name": "inner_field_1", "type": "double" }
1216                        ]
1217                      }
1218                    },
1219                    { "name": "middle_field_2", "type": "inner_record_name" }
1220                  ]
1221                }
1222              }
1223            ]
1224          }"#,
1225        )?;
1226        let value = Value::Record(vec![(
1227            "outer_field_1".into(),
1228            Value::Record(vec![
1229                (
1230                    "middle_field_1".into(),
1231                    Value::Record(vec![("inner_field_1".into(), Value::Double(1.2f64))]),
1232                ),
1233                (
1234                    "middle_field_2".into(),
1235                    Value::Record(vec![("inner_field_1".into(), Value::Double(1.6f64))]),
1236                ),
1237            ]),
1238        )]);
1239
1240        assert!(value.validate(&schema));
1241        Ok(())
1242    }
1243
1244    #[test]
1245    fn validate() -> TestResult {
1246        let value_schema_valid = vec![
1247            (Value::Int(42), Schema::Int, true, ""),
1248            (Value::Int(43), Schema::Long, true, ""),
1249            (Value::Float(43.2), Schema::Float, true, ""),
1250            (Value::Float(45.9), Schema::Double, true, ""),
1251            (
1252                Value::Int(42),
1253                Schema::Boolean,
1254                false,
1255                "Invalid value: Int(42) for schema: Boolean. Reason: Unsupported value-schema combination! Value: Int(42), schema: Boolean",
1256            ),
1257            (
1258                Value::Union(0, Box::new(Value::Null)),
1259                Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?),
1260                true,
1261                "",
1262            ),
1263            (
1264                Value::Union(1, Box::new(Value::Int(42))),
1265                Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?),
1266                true,
1267                "",
1268            ),
1269            (
1270                Value::Union(0, Box::new(Value::Null)),
1271                Schema::Union(UnionSchema::new(vec![Schema::Double, Schema::Int])?),
1272                false,
1273                "Invalid value: Union(0, Null) for schema: Union(UnionSchema { schemas: [Double, Int], variant_index: {Int: 1, Double: 0} }). Reason: Unsupported value-schema combination! Value: Null, schema: Double",
1274            ),
1275            (
1276                Value::Union(3, Box::new(Value::Int(42))),
1277                Schema::Union(
1278                    UnionSchema::new(vec![
1279                        Schema::Null,
1280                        Schema::Double,
1281                        Schema::String,
1282                        Schema::Int,
1283                    ])
1284                    ?,
1285                ),
1286                true,
1287                "",
1288            ),
1289            (
1290                Value::Union(1, Box::new(Value::Long(42i64))),
1291                Schema::Union(
1292                    UnionSchema::new(vec![Schema::Null, Schema::TimestampMillis])?,
1293                ),
1294                true,
1295                "",
1296            ),
1297            (
1298                Value::Union(2, Box::new(Value::Long(1_i64))),
1299                Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?),
1300                false,
1301                "Invalid value: Union(2, Long(1)) for schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }). Reason: No schema in the union at position '2'",
1302            ),
1303            (
1304                Value::Array(vec![Value::Long(42i64)]),
1305                Schema::array(Schema::Long),
1306                true,
1307                "",
1308            ),
1309            (
1310                Value::Array(vec![Value::Boolean(true)]),
1311                Schema::array(Schema::Long),
1312                false,
1313                "Invalid value: Array([Boolean(true)]) for schema: Array(ArraySchema { items: Long, attributes: {} }). Reason: Unsupported value-schema combination! Value: Boolean(true), schema: Long",
1314            ),
1315            (Value::Record(vec![]), Schema::Null, false, "Invalid value: Record([]) for schema: Null. Reason: Unsupported value-schema combination! Value: Record([]), schema: Null"),
1316            (
1317                Value::Fixed(12, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]),
1318                Schema::Duration,
1319                true,
1320                "",
1321            ),
1322            (
1323                Value::Fixed(11, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
1324                Schema::Duration,
1325                false,
1326                "Invalid value: Fixed(11, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) for schema: Duration. Reason: The value's size ('11') must be exactly 12 to be a Duration",
1327            ),
1328            (
1329                Value::Record(vec![("unknown_field_name".to_string(), Value::Null)]),
1330                Schema::Record(RecordSchema {
1331                    name: Name::new("record_name").unwrap(),
1332                    aliases: None,
1333                    doc: None,
1334                    fields: vec![RecordField {
1335                        name: "field_name".to_string(),
1336                        doc: None,
1337                        default: None,
1338                        aliases: None,
1339                        schema: Schema::Int,
1340                        order: RecordFieldOrder::Ignore,
1341                        position: 0,
1342                        custom_attributes: Default::default(),
1343                    }],
1344                    lookup: Default::default(),
1345                    attributes: Default::default(),
1346                }),
1347                false,
1348                r#"Invalid value: Record([("unknown_field_name", Null)]) for schema: Record(RecordSchema { name: Name { name: "record_name", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "field_name", doc: None, aliases: None, default: None, schema: Int, order: Ignore, position: 0, custom_attributes: {} }], lookup: {}, attributes: {} }). Reason: There is no schema field for field 'unknown_field_name'"#,
1349            ),
1350            (
1351                Value::Record(vec![("field_name".to_string(), Value::Null)]),
1352                Schema::Record(RecordSchema {
1353                    name: Name::new("record_name").unwrap(),
1354                    aliases: None,
1355                    doc: None,
1356                    fields: vec![RecordField {
1357                        name: "field_name".to_string(),
1358                        doc: None,
1359                        default: None,
1360                        aliases: None,
1361                        schema: Schema::Ref {
1362                            name: Name::new("missing").unwrap(),
1363                        },
1364                        order: RecordFieldOrder::Ignore,
1365                        position: 0,
1366                        custom_attributes: Default::default(),
1367                    }],
1368                    lookup: [("field_name".to_string(), 0)].iter().cloned().collect(),
1369                    attributes: Default::default(),
1370                }),
1371                false,
1372                r#"Invalid value: Record([("field_name", Null)]) for schema: Record(RecordSchema { name: Name { name: "record_name", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "field_name", doc: None, aliases: None, default: None, schema: Ref { name: Name { name: "missing", namespace: None } }, order: Ignore, position: 0, custom_attributes: {} }], lookup: {"field_name": 0}, attributes: {} }). Reason: Unresolved schema reference: 'Name { name: "missing", namespace: None }'. Parsed names: []"#,
1373            ),
1374        ];
1375
1376        for (value, schema, valid, expected_err_message) in value_schema_valid.into_iter() {
1377            let err_message =
1378                value.validate_internal::<Schema>(&schema, &HashMap::default(), &None);
1379            assert_eq!(valid, err_message.is_none());
1380            if !valid {
1381                let full_err_message = format!(
1382                    "Invalid value: {:?} for schema: {:?}. Reason: {}",
1383                    value,
1384                    schema,
1385                    err_message.unwrap()
1386                );
1387                assert_eq!(expected_err_message, full_err_message);
1388            }
1389        }
1390
1391        Ok(())
1392    }
1393
1394    #[test]
1395    fn validate_fixed() -> TestResult {
1396        let schema = Schema::Fixed(FixedSchema {
1397            size: 4,
1398            name: Name::new("some_fixed").unwrap(),
1399            aliases: None,
1400            doc: None,
1401            default: None,
1402            attributes: Default::default(),
1403        });
1404
1405        assert!(Value::Fixed(4, vec![0, 0, 0, 0]).validate(&schema));
1406        let value = Value::Fixed(5, vec![0, 0, 0, 0, 0]);
1407        assert!(!value.validate(&schema));
1408        assert_logged(
1409            format!(
1410                "Invalid value: {:?} for schema: {:?}. Reason: {}",
1411                value, schema, "The value's size (5) is different than the schema's size (4)"
1412            )
1413            .as_str(),
1414        );
1415
1416        assert!(Value::Bytes(vec![0, 0, 0, 0]).validate(&schema));
1417        let value = Value::Bytes(vec![0, 0, 0, 0, 0]);
1418        assert!(!value.validate(&schema));
1419        assert_logged(
1420            format!(
1421                "Invalid value: {:?} for schema: {:?}. Reason: {}",
1422                value, schema, "The bytes' length (5) is different than the schema's size (4)"
1423            )
1424            .as_str(),
1425        );
1426
1427        Ok(())
1428    }
1429
1430    #[test]
1431    fn validate_enum() -> TestResult {
1432        let schema = Schema::Enum(EnumSchema {
1433            name: Name::new("some_enum").unwrap(),
1434            aliases: None,
1435            doc: None,
1436            symbols: vec![
1437                "spades".to_string(),
1438                "hearts".to_string(),
1439                "diamonds".to_string(),
1440                "clubs".to_string(),
1441            ],
1442            default: None,
1443            attributes: Default::default(),
1444        });
1445
1446        assert!(Value::Enum(0, "spades".to_string()).validate(&schema));
1447        assert!(Value::String("spades".to_string()).validate(&schema));
1448
1449        let value = Value::Enum(1, "spades".to_string());
1450        assert!(!value.validate(&schema));
1451        assert_logged(
1452            format!(
1453                "Invalid value: {:?} for schema: {:?}. Reason: {}",
1454                value, schema, "Symbol 'spades' is not at position '1'"
1455            )
1456            .as_str(),
1457        );
1458
1459        let value = Value::Enum(1000, "spades".to_string());
1460        assert!(!value.validate(&schema));
1461        assert_logged(
1462            format!(
1463                "Invalid value: {:?} for schema: {:?}. Reason: {}",
1464                value, schema, "No symbol at position '1000'"
1465            )
1466            .as_str(),
1467        );
1468
1469        let value = Value::String("lorem".to_string());
1470        assert!(!value.validate(&schema));
1471        assert_logged(
1472            format!(
1473                "Invalid value: {:?} for schema: {:?}. Reason: {}",
1474                value, schema, "'lorem' is not a member of the possible symbols"
1475            )
1476            .as_str(),
1477        );
1478
1479        let other_schema = Schema::Enum(EnumSchema {
1480            name: Name::new("some_other_enum").unwrap(),
1481            aliases: None,
1482            doc: None,
1483            symbols: vec![
1484                "hearts".to_string(),
1485                "diamonds".to_string(),
1486                "clubs".to_string(),
1487                "spades".to_string(),
1488            ],
1489            default: None,
1490            attributes: Default::default(),
1491        });
1492
1493        let value = Value::Enum(0, "spades".to_string());
1494        assert!(!value.validate(&other_schema));
1495        assert_logged(
1496            format!(
1497                "Invalid value: {:?} for schema: {:?}. Reason: {}",
1498                value, other_schema, "Symbol 'spades' is not at position '0'"
1499            )
1500            .as_str(),
1501        );
1502
1503        Ok(())
1504    }
1505
1506    #[test]
1507    fn validate_record() -> TestResult {
1508        // {
1509        //    "type": "record",
1510        //    "fields": [
1511        //      {"type": "long", "name": "a"},
1512        //      {"type": "string", "name": "b"},
1513        //      {
1514        //          "type": ["null", "int"]
1515        //          "name": "c",
1516        //          "default": null
1517        //      }
1518        //    ]
1519        // }
1520        let schema = Schema::Record(RecordSchema {
1521            name: Name::new("some_record").unwrap(),
1522            aliases: None,
1523            doc: None,
1524            fields: vec![
1525                RecordField {
1526                    name: "a".to_string(),
1527                    doc: None,
1528                    default: None,
1529                    aliases: None,
1530                    schema: Schema::Long,
1531                    order: RecordFieldOrder::Ascending,
1532                    position: 0,
1533                    custom_attributes: Default::default(),
1534                },
1535                RecordField {
1536                    name: "b".to_string(),
1537                    doc: None,
1538                    default: None,
1539                    aliases: None,
1540                    schema: Schema::String,
1541                    order: RecordFieldOrder::Ascending,
1542                    position: 1,
1543                    custom_attributes: Default::default(),
1544                },
1545                RecordField {
1546                    name: "c".to_string(),
1547                    doc: None,
1548                    default: Some(JsonValue::Null),
1549                    aliases: None,
1550                    schema: Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?),
1551                    order: RecordFieldOrder::Ascending,
1552                    position: 2,
1553                    custom_attributes: Default::default(),
1554                },
1555            ],
1556            lookup: [
1557                ("a".to_string(), 0),
1558                ("b".to_string(), 1),
1559                ("c".to_string(), 2),
1560            ]
1561            .iter()
1562            .cloned()
1563            .collect(),
1564            attributes: Default::default(),
1565        });
1566
1567        assert!(Value::Record(vec![
1568            ("a".to_string(), Value::Long(42i64)),
1569            ("b".to_string(), Value::String("foo".to_string())),
1570        ])
1571        .validate(&schema));
1572
1573        let value = Value::Record(vec![
1574            ("b".to_string(), Value::String("foo".to_string())),
1575            ("a".to_string(), Value::Long(42i64)),
1576        ]);
1577        assert!(value.validate(&schema));
1578
1579        let value = Value::Record(vec![
1580            ("a".to_string(), Value::Boolean(false)),
1581            ("b".to_string(), Value::String("foo".to_string())),
1582        ]);
1583        assert!(!value.validate(&schema));
1584        assert_logged(
1585            r#"Invalid value: Record([("a", Boolean(false)), ("b", String("foo"))]) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: Unsupported value-schema combination! Value: Boolean(false), schema: Long"#,
1586        );
1587
1588        let value = Value::Record(vec![
1589            ("a".to_string(), Value::Long(42i64)),
1590            ("c".to_string(), Value::String("foo".to_string())),
1591        ]);
1592        assert!(!value.validate(&schema));
1593        assert_logged(
1594            r#"Invalid value: Record([("a", Long(42)), ("c", String("foo"))]) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: Could not find matching type in union"#,
1595        );
1596        assert_not_logged(
1597            r#"Invalid value: String("foo") for schema: Int. Reason: Unsupported value-schema combination"#,
1598        );
1599
1600        let value = Value::Record(vec![
1601            ("a".to_string(), Value::Long(42i64)),
1602            ("d".to_string(), Value::String("foo".to_string())),
1603        ]);
1604        assert!(!value.validate(&schema));
1605        assert_logged(
1606            r#"Invalid value: Record([("a", Long(42)), ("d", String("foo"))]) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: There is no schema field for field 'd'"#,
1607        );
1608
1609        let value = Value::Record(vec![
1610            ("a".to_string(), Value::Long(42i64)),
1611            ("b".to_string(), Value::String("foo".to_string())),
1612            ("c".to_string(), Value::Null),
1613            ("d".to_string(), Value::Null),
1614        ]);
1615        assert!(!value.validate(&schema));
1616        assert_logged(
1617            r#"Invalid value: Record([("a", Long(42)), ("b", String("foo")), ("c", Null), ("d", Null)]) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: The value's records length (4) is greater than the schema's (3 fields)"#,
1618        );
1619
1620        assert!(Value::Map(
1621            vec![
1622                ("a".to_string(), Value::Long(42i64)),
1623                ("b".to_string(), Value::String("foo".to_string())),
1624            ]
1625            .into_iter()
1626            .collect()
1627        )
1628        .validate(&schema));
1629
1630        assert!(!Value::Map(
1631            vec![("d".to_string(), Value::Long(123_i64)),]
1632                .into_iter()
1633                .collect()
1634        )
1635        .validate(&schema));
1636        assert_logged(
1637            r#"Invalid value: Map({"d": Long(123)}) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: Field with name '"a"' is not a member of the map items
1638Field with name '"b"' is not a member of the map items"#,
1639        );
1640
1641        let union_schema = Schema::Union(UnionSchema::new(vec![Schema::Null, schema])?);
1642
1643        assert!(Value::Union(
1644            1,
1645            Box::new(Value::Record(vec![
1646                ("a".to_string(), Value::Long(42i64)),
1647                ("b".to_string(), Value::String("foo".to_string())),
1648            ]))
1649        )
1650        .validate(&union_schema));
1651
1652        assert!(Value::Union(
1653            1,
1654            Box::new(Value::Map(
1655                vec![
1656                    ("a".to_string(), Value::Long(42i64)),
1657                    ("b".to_string(), Value::String("foo".to_string())),
1658                ]
1659                .into_iter()
1660                .collect()
1661            ))
1662        )
1663        .validate(&union_schema));
1664
1665        Ok(())
1666    }
1667
1668    #[test]
1669    fn resolve_bytes_ok() -> TestResult {
1670        let value = Value::Array(vec![Value::Int(0), Value::Int(42)]);
1671        assert_eq!(
1672            value.resolve(&Schema::Bytes)?,
1673            Value::Bytes(vec![0u8, 42u8])
1674        );
1675
1676        Ok(())
1677    }
1678
1679    #[test]
1680    fn resolve_string_from_bytes() -> TestResult {
1681        let value = Value::Bytes(vec![97, 98, 99]);
1682        assert_eq!(
1683            value.resolve(&Schema::String)?,
1684            Value::String("abc".to_string())
1685        );
1686
1687        Ok(())
1688    }
1689
1690    #[test]
1691    fn resolve_string_from_fixed() -> TestResult {
1692        let value = Value::Fixed(3, vec![97, 98, 99]);
1693        assert_eq!(
1694            value.resolve(&Schema::String)?,
1695            Value::String("abc".to_string())
1696        );
1697
1698        Ok(())
1699    }
1700
1701    #[test]
1702    fn resolve_bytes_failure() {
1703        let value = Value::Array(vec![Value::Int(2000), Value::Int(-42)]);
1704        assert!(value.resolve(&Schema::Bytes).is_err());
1705    }
1706
1707    #[test]
1708    fn resolve_decimal_bytes() -> TestResult {
1709        let value = Value::Decimal(Decimal::from(vec![1, 2, 3, 4, 5]));
1710        value.clone().resolve(&Schema::Decimal(DecimalSchema {
1711            precision: 10,
1712            scale: 4,
1713            inner: Box::new(Schema::Bytes),
1714        }))?;
1715        assert!(value.resolve(&Schema::String).is_err());
1716
1717        Ok(())
1718    }
1719
1720    #[test]
1721    fn resolve_decimal_invalid_scale() {
1722        let value = Value::Decimal(Decimal::from(vec![1, 2]));
1723        assert!(value
1724            .resolve(&Schema::Decimal(DecimalSchema {
1725                precision: 2,
1726                scale: 3,
1727                inner: Box::new(Schema::Bytes),
1728            }))
1729            .is_err());
1730    }
1731
1732    #[test]
1733    fn resolve_decimal_invalid_precision_for_length() {
1734        let value = Value::Decimal(Decimal::from((1u8..=8u8).rev().collect::<Vec<_>>()));
1735        assert!(value
1736            .resolve(&Schema::Decimal(DecimalSchema {
1737                precision: 1,
1738                scale: 0,
1739                inner: Box::new(Schema::Bytes),
1740            }))
1741            .is_ok());
1742    }
1743
1744    #[test]
1745    fn resolve_decimal_fixed() {
1746        let value = Value::Decimal(Decimal::from(vec![1, 2, 3, 4, 5]));
1747        assert!(value
1748            .clone()
1749            .resolve(&Schema::Decimal(DecimalSchema {
1750                precision: 10,
1751                scale: 1,
1752                inner: Box::new(Schema::Fixed(FixedSchema {
1753                    name: Name::new("decimal").unwrap(),
1754                    aliases: None,
1755                    size: 20,
1756                    doc: None,
1757                    default: None,
1758                    attributes: Default::default(),
1759                }))
1760            }))
1761            .is_ok());
1762        assert!(value.resolve(&Schema::String).is_err());
1763    }
1764
1765    #[test]
1766    fn resolve_date() {
1767        let value = Value::Date(2345);
1768        assert!(value.clone().resolve(&Schema::Date).is_ok());
1769        assert!(value.resolve(&Schema::String).is_err());
1770    }
1771
1772    #[test]
1773    fn resolve_time_millis() {
1774        let value = Value::TimeMillis(10);
1775        assert!(value.clone().resolve(&Schema::TimeMillis).is_ok());
1776        assert!(value.resolve(&Schema::TimeMicros).is_err());
1777    }
1778
1779    #[test]
1780    fn resolve_time_micros() {
1781        let value = Value::TimeMicros(10);
1782        assert!(value.clone().resolve(&Schema::TimeMicros).is_ok());
1783        assert!(value.resolve(&Schema::TimeMillis).is_err());
1784    }
1785
1786    #[test]
1787    fn resolve_timestamp_millis() {
1788        let value = Value::TimestampMillis(10);
1789        assert!(value.clone().resolve(&Schema::TimestampMillis).is_ok());
1790        assert!(value.resolve(&Schema::Float).is_err());
1791
1792        let value = Value::Float(10.0f32);
1793        assert!(value.resolve(&Schema::TimestampMillis).is_err());
1794    }
1795
1796    #[test]
1797    fn resolve_timestamp_micros() {
1798        let value = Value::TimestampMicros(10);
1799        assert!(value.clone().resolve(&Schema::TimestampMicros).is_ok());
1800        assert!(value.resolve(&Schema::Int).is_err());
1801
1802        let value = Value::Double(10.0);
1803        assert!(value.resolve(&Schema::TimestampMicros).is_err());
1804    }
1805
1806    #[test]
1807    fn test_avro_3914_resolve_timestamp_nanos() {
1808        let value = Value::TimestampNanos(10);
1809        assert!(value.clone().resolve(&Schema::TimestampNanos).is_ok());
1810        assert!(value.resolve(&Schema::Int).is_err());
1811
1812        let value = Value::Double(10.0);
1813        assert!(value.resolve(&Schema::TimestampNanos).is_err());
1814    }
1815
1816    #[test]
1817    fn test_avro_3853_resolve_timestamp_millis() {
1818        let value = Value::LocalTimestampMillis(10);
1819        assert!(value.clone().resolve(&Schema::LocalTimestampMillis).is_ok());
1820        assert!(value.resolve(&Schema::Float).is_err());
1821
1822        let value = Value::Float(10.0f32);
1823        assert!(value.resolve(&Schema::LocalTimestampMillis).is_err());
1824    }
1825
1826    #[test]
1827    fn test_avro_3853_resolve_timestamp_micros() {
1828        let value = Value::LocalTimestampMicros(10);
1829        assert!(value.clone().resolve(&Schema::LocalTimestampMicros).is_ok());
1830        assert!(value.resolve(&Schema::Int).is_err());
1831
1832        let value = Value::Double(10.0);
1833        assert!(value.resolve(&Schema::LocalTimestampMicros).is_err());
1834    }
1835
1836    #[test]
1837    fn test_avro_3916_resolve_timestamp_nanos() {
1838        let value = Value::LocalTimestampNanos(10);
1839        assert!(value.clone().resolve(&Schema::LocalTimestampNanos).is_ok());
1840        assert!(value.resolve(&Schema::Int).is_err());
1841
1842        let value = Value::Double(10.0);
1843        assert!(value.resolve(&Schema::LocalTimestampNanos).is_err());
1844    }
1845
1846    #[test]
1847    fn resolve_duration() {
1848        let value = Value::Duration(Duration::new(
1849            Months::new(10),
1850            Days::new(5),
1851            Millis::new(3000),
1852        ));
1853        assert!(value.clone().resolve(&Schema::Duration).is_ok());
1854        assert!(value.resolve(&Schema::TimestampMicros).is_err());
1855        assert!(Value::Long(1i64).resolve(&Schema::Duration).is_err());
1856    }
1857
1858    #[test]
1859    fn resolve_uuid() -> TestResult {
1860        let value = Value::Uuid(Uuid::parse_str("1481531d-ccc9-46d9-a56f-5b67459c0537")?);
1861        assert!(value.clone().resolve(&Schema::Uuid).is_ok());
1862        assert!(value.resolve(&Schema::TimestampMicros).is_err());
1863
1864        Ok(())
1865    }
1866
1867    #[test]
1868    fn avro_3678_resolve_float_to_double() {
1869        let value = Value::Float(2345.1);
1870        assert!(value.resolve(&Schema::Double).is_ok());
1871    }
1872
1873    #[test]
1874    fn test_avro_3621_resolve_to_nullable_union() -> TestResult {
1875        let schema = Schema::parse_str(
1876            r#"{
1877            "type": "record",
1878            "name": "root",
1879            "fields": [
1880                {
1881                    "name": "event",
1882                    "type": [
1883                        "null",
1884                        {
1885                            "type": "record",
1886                            "name": "event",
1887                            "fields": [
1888                                {
1889                                    "name": "amount",
1890                                    "type": "int"
1891                                },
1892                                {
1893                                    "name": "size",
1894                                    "type": [
1895                                        "null",
1896                                        "int"
1897                                    ],
1898                                    "default": null
1899                                }
1900                            ]
1901                        }
1902                    ],
1903                    "default": null
1904                }
1905            ]
1906        }"#,
1907        )?;
1908
1909        let value = Value::Record(vec![(
1910            "event".to_string(),
1911            Value::Record(vec![("amount".to_string(), Value::Int(200))]),
1912        )]);
1913        assert!(value.resolve(&schema).is_ok());
1914
1915        let value = Value::Record(vec![(
1916            "event".to_string(),
1917            Value::Record(vec![("size".to_string(), Value::Int(1))]),
1918        )]);
1919        assert!(value.resolve(&schema).is_err());
1920
1921        Ok(())
1922    }
1923
1924    #[test]
1925    fn json_from_avro() -> TestResult {
1926        assert_eq!(JsonValue::try_from(Value::Null)?, JsonValue::Null);
1927        assert_eq!(
1928            JsonValue::try_from(Value::Boolean(true))?,
1929            JsonValue::Bool(true)
1930        );
1931        assert_eq!(
1932            JsonValue::try_from(Value::Int(1))?,
1933            JsonValue::Number(1.into())
1934        );
1935        assert_eq!(
1936            JsonValue::try_from(Value::Long(1))?,
1937            JsonValue::Number(1.into())
1938        );
1939        assert_eq!(
1940            JsonValue::try_from(Value::Float(1.0))?,
1941            JsonValue::Number(Number::from_f64(1.0).unwrap())
1942        );
1943        assert_eq!(
1944            JsonValue::try_from(Value::Double(1.0))?,
1945            JsonValue::Number(Number::from_f64(1.0).unwrap())
1946        );
1947        assert_eq!(
1948            JsonValue::try_from(Value::Bytes(vec![1, 2, 3]))?,
1949            JsonValue::Array(vec![
1950                JsonValue::Number(1.into()),
1951                JsonValue::Number(2.into()),
1952                JsonValue::Number(3.into())
1953            ])
1954        );
1955        assert_eq!(
1956            JsonValue::try_from(Value::String("test".into()))?,
1957            JsonValue::String("test".into())
1958        );
1959        assert_eq!(
1960            JsonValue::try_from(Value::Fixed(3, vec![1, 2, 3]))?,
1961            JsonValue::Array(vec![
1962                JsonValue::Number(1.into()),
1963                JsonValue::Number(2.into()),
1964                JsonValue::Number(3.into())
1965            ])
1966        );
1967        assert_eq!(
1968            JsonValue::try_from(Value::Enum(1, "test_enum".into()))?,
1969            JsonValue::String("test_enum".into())
1970        );
1971        assert_eq!(
1972            JsonValue::try_from(Value::Union(1, Box::new(Value::String("test_enum".into()))))?,
1973            JsonValue::String("test_enum".into())
1974        );
1975        assert_eq!(
1976            JsonValue::try_from(Value::Array(vec![
1977                Value::Int(1),
1978                Value::Int(2),
1979                Value::Int(3)
1980            ]))?,
1981            JsonValue::Array(vec![
1982                JsonValue::Number(1.into()),
1983                JsonValue::Number(2.into()),
1984                JsonValue::Number(3.into())
1985            ])
1986        );
1987        assert_eq!(
1988            JsonValue::try_from(Value::Map(
1989                vec![
1990                    ("v1".to_string(), Value::Int(1)),
1991                    ("v2".to_string(), Value::Int(2)),
1992                    ("v3".to_string(), Value::Int(3))
1993                ]
1994                .into_iter()
1995                .collect()
1996            ))?,
1997            JsonValue::Object(
1998                vec![
1999                    ("v1".to_string(), JsonValue::Number(1.into())),
2000                    ("v2".to_string(), JsonValue::Number(2.into())),
2001                    ("v3".to_string(), JsonValue::Number(3.into()))
2002                ]
2003                .into_iter()
2004                .collect()
2005            )
2006        );
2007        assert_eq!(
2008            JsonValue::try_from(Value::Record(vec![
2009                ("v1".to_string(), Value::Int(1)),
2010                ("v2".to_string(), Value::Int(2)),
2011                ("v3".to_string(), Value::Int(3))
2012            ]))?,
2013            JsonValue::Object(
2014                vec![
2015                    ("v1".to_string(), JsonValue::Number(1.into())),
2016                    ("v2".to_string(), JsonValue::Number(2.into())),
2017                    ("v3".to_string(), JsonValue::Number(3.into()))
2018                ]
2019                .into_iter()
2020                .collect()
2021            )
2022        );
2023        assert_eq!(
2024            JsonValue::try_from(Value::Date(1))?,
2025            JsonValue::Number(1.into())
2026        );
2027        assert_eq!(
2028            JsonValue::try_from(Value::Decimal(vec![1, 2, 3].into()))?,
2029            JsonValue::Array(vec![
2030                JsonValue::Number(1.into()),
2031                JsonValue::Number(2.into()),
2032                JsonValue::Number(3.into())
2033            ])
2034        );
2035        assert_eq!(
2036            JsonValue::try_from(Value::TimeMillis(1))?,
2037            JsonValue::Number(1.into())
2038        );
2039        assert_eq!(
2040            JsonValue::try_from(Value::TimeMicros(1))?,
2041            JsonValue::Number(1.into())
2042        );
2043        assert_eq!(
2044            JsonValue::try_from(Value::TimestampMillis(1))?,
2045            JsonValue::Number(1.into())
2046        );
2047        assert_eq!(
2048            JsonValue::try_from(Value::TimestampMicros(1))?,
2049            JsonValue::Number(1.into())
2050        );
2051        assert_eq!(
2052            JsonValue::try_from(Value::TimestampNanos(1))?,
2053            JsonValue::Number(1.into())
2054        );
2055        assert_eq!(
2056            JsonValue::try_from(Value::LocalTimestampMillis(1))?,
2057            JsonValue::Number(1.into())
2058        );
2059        assert_eq!(
2060            JsonValue::try_from(Value::LocalTimestampMicros(1))?,
2061            JsonValue::Number(1.into())
2062        );
2063        assert_eq!(
2064            JsonValue::try_from(Value::LocalTimestampNanos(1))?,
2065            JsonValue::Number(1.into())
2066        );
2067        assert_eq!(
2068            JsonValue::try_from(Value::Duration(
2069                [1u8, 2u8, 3u8, 4u8, 5u8, 6u8, 7u8, 8u8, 9u8, 10u8, 11u8, 12u8].into()
2070            ))?,
2071            JsonValue::Array(vec![
2072                JsonValue::Number(1.into()),
2073                JsonValue::Number(2.into()),
2074                JsonValue::Number(3.into()),
2075                JsonValue::Number(4.into()),
2076                JsonValue::Number(5.into()),
2077                JsonValue::Number(6.into()),
2078                JsonValue::Number(7.into()),
2079                JsonValue::Number(8.into()),
2080                JsonValue::Number(9.into()),
2081                JsonValue::Number(10.into()),
2082                JsonValue::Number(11.into()),
2083                JsonValue::Number(12.into()),
2084            ])
2085        );
2086        assert_eq!(
2087            JsonValue::try_from(Value::Uuid(Uuid::parse_str(
2088                "936DA01F-9ABD-4D9D-80C7-02AF85C822A8"
2089            )?))?,
2090            JsonValue::String("936da01f-9abd-4d9d-80c7-02af85c822a8".into())
2091        );
2092
2093        Ok(())
2094    }
2095
2096    #[test]
2097    fn test_avro_3433_recursive_resolves_record() -> TestResult {
2098        let schema = Schema::parse_str(
2099            r#"
2100        {
2101            "type":"record",
2102            "name":"TestStruct",
2103            "fields": [
2104                {
2105                    "name":"a",
2106                    "type":{
2107                        "type":"record",
2108                        "name": "Inner",
2109                        "fields": [ {
2110                            "name":"z",
2111                            "type":"int"
2112                        }]
2113                    }
2114                },
2115                {
2116                    "name":"b",
2117                    "type":"Inner"
2118                }
2119            ]
2120        }"#,
2121        )?;
2122
2123        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2124        let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2125        let outer = Value::Record(vec![("a".into(), inner_value1), ("b".into(), inner_value2)]);
2126        outer
2127            .resolve(&schema)
2128            .expect("Record definition defined in one field must be available in other field");
2129
2130        Ok(())
2131    }
2132
2133    #[test]
2134    fn test_avro_3433_recursive_resolves_array() -> TestResult {
2135        let schema = Schema::parse_str(
2136            r#"
2137        {
2138            "type":"record",
2139            "name":"TestStruct",
2140            "fields": [
2141                {
2142                    "name":"a",
2143                    "type":{
2144                        "type":"array",
2145                        "items": {
2146                            "type":"record",
2147                            "name": "Inner",
2148                            "fields": [ {
2149                                "name":"z",
2150                                "type":"int"
2151                            }]
2152                        }
2153                    }
2154                },
2155                {
2156                    "name":"b",
2157                    "type": {
2158                        "type":"map",
2159                        "values":"Inner"
2160                    }
2161                }
2162            ]
2163        }"#,
2164        )?;
2165
2166        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2167        let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2168        let outer_value = Value::Record(vec![
2169            ("a".into(), Value::Array(vec![inner_value1])),
2170            (
2171                "b".into(),
2172                Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
2173            ),
2174        ]);
2175        outer_value
2176            .resolve(&schema)
2177            .expect("Record defined in array definition must be resolvable from map");
2178
2179        Ok(())
2180    }
2181
2182    #[test]
2183    fn test_avro_3433_recursive_resolves_map() -> TestResult {
2184        let schema = Schema::parse_str(
2185            r#"
2186        {
2187            "type":"record",
2188            "name":"TestStruct",
2189            "fields": [
2190                {
2191                    "name":"a",
2192                    "type":{
2193                        "type":"record",
2194                        "name": "Inner",
2195                        "fields": [ {
2196                            "name":"z",
2197                            "type":"int"
2198                        }]
2199                    }
2200                },
2201                {
2202                    "name":"b",
2203                    "type": {
2204                        "type":"map",
2205                        "values":"Inner"
2206                    }
2207                }
2208            ]
2209        }"#,
2210        )?;
2211
2212        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2213        let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2214        let outer_value = Value::Record(vec![
2215            ("a".into(), inner_value1),
2216            (
2217                "b".into(),
2218                Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
2219            ),
2220        ]);
2221        outer_value
2222            .resolve(&schema)
2223            .expect("Record defined in record field must be resolvable from map field");
2224
2225        Ok(())
2226    }
2227
2228    #[test]
2229    fn test_avro_3433_recursive_resolves_record_wrapper() -> TestResult {
2230        let schema = Schema::parse_str(
2231            r#"
2232        {
2233            "type":"record",
2234            "name":"TestStruct",
2235            "fields": [
2236                {
2237                    "name":"a",
2238                    "type":{
2239                        "type":"record",
2240                        "name": "Inner",
2241                        "fields": [ {
2242                            "name":"z",
2243                            "type":"int"
2244                        }]
2245                    }
2246                },
2247                {
2248                    "name":"b",
2249                    "type": {
2250                        "type":"record",
2251                        "name": "InnerWrapper",
2252                        "fields": [ {
2253                            "name":"j",
2254                            "type":"Inner"
2255                        }]
2256                    }
2257                }
2258            ]
2259        }"#,
2260        )?;
2261
2262        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2263        let inner_value2 = Value::Record(vec![(
2264            "j".into(),
2265            Value::Record(vec![("z".into(), Value::Int(6))]),
2266        )]);
2267        let outer_value =
2268            Value::Record(vec![("a".into(), inner_value1), ("b".into(), inner_value2)]);
2269        outer_value.resolve(&schema).expect("Record schema defined in field must be resolvable in Record schema defined in other field");
2270
2271        Ok(())
2272    }
2273
2274    #[test]
2275    fn test_avro_3433_recursive_resolves_map_and_array() -> TestResult {
2276        let schema = Schema::parse_str(
2277            r#"
2278        {
2279            "type":"record",
2280            "name":"TestStruct",
2281            "fields": [
2282                {
2283                    "name":"a",
2284                    "type":{
2285                        "type":"map",
2286                        "values": {
2287                            "type":"record",
2288                            "name": "Inner",
2289                            "fields": [ {
2290                                "name":"z",
2291                                "type":"int"
2292                            }]
2293                        }
2294                    }
2295                },
2296                {
2297                    "name":"b",
2298                    "type": {
2299                        "type":"array",
2300                        "items":"Inner"
2301                    }
2302                }
2303            ]
2304        }"#,
2305        )?;
2306
2307        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2308        let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2309        let outer_value = Value::Record(vec![
2310            (
2311                "a".into(),
2312                Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
2313            ),
2314            ("b".into(), Value::Array(vec![inner_value1])),
2315        ]);
2316        outer_value
2317            .resolve(&schema)
2318            .expect("Record defined in map definition must be resolvable from array");
2319
2320        Ok(())
2321    }
2322
2323    #[test]
2324    fn test_avro_3433_recursive_resolves_union() -> TestResult {
2325        let schema = Schema::parse_str(
2326            r#"
2327        {
2328            "type":"record",
2329            "name":"TestStruct",
2330            "fields": [
2331                {
2332                    "name":"a",
2333                    "type":["null", {
2334                        "type":"record",
2335                        "name": "Inner",
2336                        "fields": [ {
2337                            "name":"z",
2338                            "type":"int"
2339                        }]
2340                    }]
2341                },
2342                {
2343                    "name":"b",
2344                    "type":"Inner"
2345                }
2346            ]
2347        }"#,
2348        )?;
2349
2350        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2351        let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2352        let outer1 = Value::Record(vec![
2353            ("a".into(), inner_value1),
2354            ("b".into(), inner_value2.clone()),
2355        ]);
2356        outer1
2357            .resolve(&schema)
2358            .expect("Record definition defined in union must be resolved in other field");
2359        let outer2 = Value::Record(vec![("a".into(), Value::Null), ("b".into(), inner_value2)]);
2360        outer2
2361            .resolve(&schema)
2362            .expect("Record definition defined in union must be resolved in other field");
2363
2364        Ok(())
2365    }
2366
2367    #[test]
2368    fn test_avro_3461_test_multi_level_resolve_outer_namespace() -> TestResult {
2369        let schema = r#"
2370        {
2371          "name": "record_name",
2372          "namespace": "space",
2373          "type": "record",
2374          "fields": [
2375            {
2376              "name": "outer_field_1",
2377              "type": [
2378                        "null",
2379                        {
2380                            "type": "record",
2381                            "name": "middle_record_name",
2382                            "fields":[
2383                                {
2384                                    "name":"middle_field_1",
2385                                    "type":[
2386                                        "null",
2387                                        {
2388                                            "type":"record",
2389                                            "name":"inner_record_name",
2390                                            "fields":[
2391                                                {
2392                                                    "name":"inner_field_1",
2393                                                    "type":"double"
2394                                                }
2395                                            ]
2396                                        }
2397                                    ]
2398                                }
2399                            ]
2400                        }
2401                    ]
2402            },
2403            {
2404                "name": "outer_field_2",
2405                "type" : "space.inner_record_name"
2406            }
2407          ]
2408        }
2409        "#;
2410        let schema = Schema::parse_str(schema)?;
2411        let inner_record = Value::Record(vec![("inner_field_1".into(), Value::Double(5.4))]);
2412        let middle_record_variation_1 = Value::Record(vec![(
2413            "middle_field_1".into(),
2414            Value::Union(0, Box::new(Value::Null)),
2415        )]);
2416        let middle_record_variation_2 = Value::Record(vec![(
2417            "middle_field_1".into(),
2418            Value::Union(1, Box::new(inner_record.clone())),
2419        )]);
2420        let outer_record_variation_1 = Value::Record(vec![
2421            (
2422                "outer_field_1".into(),
2423                Value::Union(0, Box::new(Value::Null)),
2424            ),
2425            ("outer_field_2".into(), inner_record.clone()),
2426        ]);
2427        let outer_record_variation_2 = Value::Record(vec![
2428            (
2429                "outer_field_1".into(),
2430                Value::Union(1, Box::new(middle_record_variation_1)),
2431            ),
2432            ("outer_field_2".into(), inner_record.clone()),
2433        ]);
2434        let outer_record_variation_3 = Value::Record(vec![
2435            (
2436                "outer_field_1".into(),
2437                Value::Union(1, Box::new(middle_record_variation_2)),
2438            ),
2439            ("outer_field_2".into(), inner_record),
2440        ]);
2441
2442        outer_record_variation_1
2443            .resolve(&schema)
2444            .expect("Should be able to resolve value to the schema that is it's definition");
2445        outer_record_variation_2
2446            .resolve(&schema)
2447            .expect("Should be able to resolve value to the schema that is it's definition");
2448        outer_record_variation_3
2449            .resolve(&schema)
2450            .expect("Should be able to resolve value to the schema that is it's definition");
2451
2452        Ok(())
2453    }
2454
2455    #[test]
2456    fn test_avro_3461_test_multi_level_resolve_middle_namespace() -> TestResult {
2457        let schema = r#"
2458        {
2459          "name": "record_name",
2460          "namespace": "space",
2461          "type": "record",
2462          "fields": [
2463            {
2464              "name": "outer_field_1",
2465              "type": [
2466                        "null",
2467                        {
2468                            "type": "record",
2469                            "name": "middle_record_name",
2470                            "namespace":"middle_namespace",
2471                            "fields":[
2472                                {
2473                                    "name":"middle_field_1",
2474                                    "type":[
2475                                        "null",
2476                                        {
2477                                            "type":"record",
2478                                            "name":"inner_record_name",
2479                                            "fields":[
2480                                                {
2481                                                    "name":"inner_field_1",
2482                                                    "type":"double"
2483                                                }
2484                                            ]
2485                                        }
2486                                    ]
2487                                }
2488                            ]
2489                        }
2490                    ]
2491            },
2492            {
2493                "name": "outer_field_2",
2494                "type" : "middle_namespace.inner_record_name"
2495            }
2496          ]
2497        }
2498        "#;
2499        let schema = Schema::parse_str(schema)?;
2500        let inner_record = Value::Record(vec![("inner_field_1".into(), Value::Double(5.4))]);
2501        let middle_record_variation_1 = Value::Record(vec![(
2502            "middle_field_1".into(),
2503            Value::Union(0, Box::new(Value::Null)),
2504        )]);
2505        let middle_record_variation_2 = Value::Record(vec![(
2506            "middle_field_1".into(),
2507            Value::Union(1, Box::new(inner_record.clone())),
2508        )]);
2509        let outer_record_variation_1 = Value::Record(vec![
2510            (
2511                "outer_field_1".into(),
2512                Value::Union(0, Box::new(Value::Null)),
2513            ),
2514            ("outer_field_2".into(), inner_record.clone()),
2515        ]);
2516        let outer_record_variation_2 = Value::Record(vec![
2517            (
2518                "outer_field_1".into(),
2519                Value::Union(1, Box::new(middle_record_variation_1)),
2520            ),
2521            ("outer_field_2".into(), inner_record.clone()),
2522        ]);
2523        let outer_record_variation_3 = Value::Record(vec![
2524            (
2525                "outer_field_1".into(),
2526                Value::Union(1, Box::new(middle_record_variation_2)),
2527            ),
2528            ("outer_field_2".into(), inner_record),
2529        ]);
2530
2531        outer_record_variation_1
2532            .resolve(&schema)
2533            .expect("Should be able to resolve value to the schema that is it's definition");
2534        outer_record_variation_2
2535            .resolve(&schema)
2536            .expect("Should be able to resolve value to the schema that is it's definition");
2537        outer_record_variation_3
2538            .resolve(&schema)
2539            .expect("Should be able to resolve value to the schema that is it's definition");
2540
2541        Ok(())
2542    }
2543
2544    #[test]
2545    fn test_avro_3461_test_multi_level_resolve_inner_namespace() -> TestResult {
2546        let schema = r#"
2547        {
2548          "name": "record_name",
2549          "namespace": "space",
2550          "type": "record",
2551          "fields": [
2552            {
2553              "name": "outer_field_1",
2554              "type": [
2555                        "null",
2556                        {
2557                            "type": "record",
2558                            "name": "middle_record_name",
2559                            "namespace":"middle_namespace",
2560                            "fields":[
2561                                {
2562                                    "name":"middle_field_1",
2563                                    "type":[
2564                                        "null",
2565                                        {
2566                                            "type":"record",
2567                                            "name":"inner_record_name",
2568                                            "namespace":"inner_namespace",
2569                                            "fields":[
2570                                                {
2571                                                    "name":"inner_field_1",
2572                                                    "type":"double"
2573                                                }
2574                                            ]
2575                                        }
2576                                    ]
2577                                }
2578                            ]
2579                        }
2580                    ]
2581            },
2582            {
2583                "name": "outer_field_2",
2584                "type" : "inner_namespace.inner_record_name"
2585            }
2586          ]
2587        }
2588        "#;
2589        let schema = Schema::parse_str(schema)?;
2590
2591        let inner_record = Value::Record(vec![("inner_field_1".into(), Value::Double(5.4))]);
2592        let middle_record_variation_1 = Value::Record(vec![(
2593            "middle_field_1".into(),
2594            Value::Union(0, Box::new(Value::Null)),
2595        )]);
2596        let middle_record_variation_2 = Value::Record(vec![(
2597            "middle_field_1".into(),
2598            Value::Union(1, Box::new(inner_record.clone())),
2599        )]);
2600        let outer_record_variation_1 = Value::Record(vec![
2601            (
2602                "outer_field_1".into(),
2603                Value::Union(0, Box::new(Value::Null)),
2604            ),
2605            ("outer_field_2".into(), inner_record.clone()),
2606        ]);
2607        let outer_record_variation_2 = Value::Record(vec![
2608            (
2609                "outer_field_1".into(),
2610                Value::Union(1, Box::new(middle_record_variation_1)),
2611            ),
2612            ("outer_field_2".into(), inner_record.clone()),
2613        ]);
2614        let outer_record_variation_3 = Value::Record(vec![
2615            (
2616                "outer_field_1".into(),
2617                Value::Union(1, Box::new(middle_record_variation_2)),
2618            ),
2619            ("outer_field_2".into(), inner_record),
2620        ]);
2621
2622        outer_record_variation_1
2623            .resolve(&schema)
2624            .expect("Should be able to resolve value to the schema that is it's definition");
2625        outer_record_variation_2
2626            .resolve(&schema)
2627            .expect("Should be able to resolve value to the schema that is it's definition");
2628        outer_record_variation_3
2629            .resolve(&schema)
2630            .expect("Should be able to resolve value to the schema that is it's definition");
2631
2632        Ok(())
2633    }
2634
2635    #[test]
2636    fn test_avro_3460_validation_with_refs() -> TestResult {
2637        let schema = Schema::parse_str(
2638            r#"
2639        {
2640            "type":"record",
2641            "name":"TestStruct",
2642            "fields": [
2643                {
2644                    "name":"a",
2645                    "type":{
2646                        "type":"record",
2647                        "name": "Inner",
2648                        "fields": [ {
2649                            "name":"z",
2650                            "type":"int"
2651                        }]
2652                    }
2653                },
2654                {
2655                    "name":"b",
2656                    "type":"Inner"
2657                }
2658            ]
2659        }"#,
2660        )?;
2661
2662        let inner_value_right = Value::Record(vec![("z".into(), Value::Int(3))]);
2663        let inner_value_wrong1 = Value::Record(vec![("z".into(), Value::Null)]);
2664        let inner_value_wrong2 = Value::Record(vec![("a".into(), Value::String("testing".into()))]);
2665        let outer1 = Value::Record(vec![
2666            ("a".into(), inner_value_right.clone()),
2667            ("b".into(), inner_value_wrong1),
2668        ]);
2669
2670        let outer2 = Value::Record(vec![
2671            ("a".into(), inner_value_right),
2672            ("b".into(), inner_value_wrong2),
2673        ]);
2674
2675        assert!(
2676            !outer1.validate(&schema),
2677            "field b record is invalid against the schema"
2678        ); // this should pass, but doesn't
2679        assert!(
2680            !outer2.validate(&schema),
2681            "field b record is invalid against the schema"
2682        ); // this should pass, but doesn't
2683
2684        Ok(())
2685    }
2686
2687    #[test]
2688    fn test_avro_3460_validation_with_refs_real_struct() -> TestResult {
2689        use crate::ser::Serializer;
2690        use serde::Serialize;
2691
2692        #[derive(Serialize, Clone)]
2693        struct TestInner {
2694            z: i32,
2695        }
2696
2697        #[derive(Serialize)]
2698        struct TestRefSchemaStruct1 {
2699            a: TestInner,
2700            b: String, // could be literally anything
2701        }
2702
2703        #[derive(Serialize)]
2704        struct TestRefSchemaStruct2 {
2705            a: TestInner,
2706            b: i32, // could be literally anything
2707        }
2708
2709        #[derive(Serialize)]
2710        struct TestRefSchemaStruct3 {
2711            a: TestInner,
2712            b: Option<TestInner>, // could be literally anything
2713        }
2714
2715        let schema = Schema::parse_str(
2716            r#"
2717        {
2718            "type":"record",
2719            "name":"TestStruct",
2720            "fields": [
2721                {
2722                    "name":"a",
2723                    "type":{
2724                        "type":"record",
2725                        "name": "Inner",
2726                        "fields": [ {
2727                            "name":"z",
2728                            "type":"int"
2729                        }]
2730                    }
2731                },
2732                {
2733                    "name":"b",
2734                    "type":"Inner"
2735                }
2736            ]
2737        }"#,
2738        )?;
2739
2740        let test_inner = TestInner { z: 3 };
2741        let test_outer1 = TestRefSchemaStruct1 {
2742            a: test_inner.clone(),
2743            b: "testing".into(),
2744        };
2745        let test_outer2 = TestRefSchemaStruct2 {
2746            a: test_inner.clone(),
2747            b: 24,
2748        };
2749        let test_outer3 = TestRefSchemaStruct3 {
2750            a: test_inner,
2751            b: None,
2752        };
2753
2754        let mut ser = Serializer::default();
2755        let test_outer1: Value = test_outer1.serialize(&mut ser)?;
2756        let mut ser = Serializer::default();
2757        let test_outer2: Value = test_outer2.serialize(&mut ser)?;
2758        let mut ser = Serializer::default();
2759        let test_outer3: Value = test_outer3.serialize(&mut ser)?;
2760
2761        assert!(
2762            !test_outer1.validate(&schema),
2763            "field b record is invalid against the schema"
2764        );
2765        assert!(
2766            !test_outer2.validate(&schema),
2767            "field b record is invalid against the schema"
2768        );
2769        assert!(
2770            !test_outer3.validate(&schema),
2771            "field b record is invalid against the schema"
2772        );
2773
2774        Ok(())
2775    }
2776
2777    fn avro_3674_with_or_without_namespace(with_namespace: bool) -> TestResult {
2778        use crate::ser::Serializer;
2779        use serde::Serialize;
2780
2781        let schema_str = r#"
2782        {
2783            "type": "record",
2784            "name": "NamespacedMessage",
2785            [NAMESPACE]
2786            "fields": [
2787                {
2788                    "name": "field_a",
2789                    "type": {
2790                        "type": "record",
2791                        "name": "NestedMessage",
2792                        "fields": [
2793                            {
2794                                "name": "enum_a",
2795                                "type": {
2796                                "type": "enum",
2797                                "name": "EnumType",
2798                                "symbols": ["SYMBOL_1", "SYMBOL_2"],
2799                                "default": "SYMBOL_1"
2800                                }
2801                            },
2802                            {
2803                                "name": "enum_b",
2804                                "type": "EnumType"
2805                            }
2806                        ]
2807                    }
2808                }
2809            ]
2810        }
2811        "#;
2812        let schema_str = schema_str.replace(
2813            "[NAMESPACE]",
2814            if with_namespace {
2815                r#""namespace": "com.domain","#
2816            } else {
2817                ""
2818            },
2819        );
2820
2821        let schema = Schema::parse_str(&schema_str)?;
2822
2823        #[derive(Serialize)]
2824        enum EnumType {
2825            #[serde(rename = "SYMBOL_1")]
2826            Symbol1,
2827            #[serde(rename = "SYMBOL_2")]
2828            Symbol2,
2829        }
2830
2831        #[derive(Serialize)]
2832        struct FieldA {
2833            enum_a: EnumType,
2834            enum_b: EnumType,
2835        }
2836
2837        #[derive(Serialize)]
2838        struct NamespacedMessage {
2839            field_a: FieldA,
2840        }
2841
2842        let msg = NamespacedMessage {
2843            field_a: FieldA {
2844                enum_a: EnumType::Symbol2,
2845                enum_b: EnumType::Symbol1,
2846            },
2847        };
2848
2849        let mut ser = Serializer::default();
2850        let test_value: Value = msg.serialize(&mut ser)?;
2851        assert!(test_value.validate(&schema), "test_value should validate");
2852        assert!(
2853            test_value.resolve(&schema).is_ok(),
2854            "test_value should resolve"
2855        );
2856
2857        Ok(())
2858    }
2859
2860    #[test]
2861    fn test_avro_3674_validate_no_namespace_resolution() -> TestResult {
2862        avro_3674_with_or_without_namespace(false)
2863    }
2864
2865    #[test]
2866    fn test_avro_3674_validate_with_namespace_resolution() -> TestResult {
2867        avro_3674_with_or_without_namespace(true)
2868    }
2869
2870    fn avro_3688_schema_resolution_panic(set_field_b: bool) -> TestResult {
2871        use crate::ser::Serializer;
2872        use serde::{Deserialize, Serialize};
2873
2874        let schema_str = r#"{
2875            "type": "record",
2876            "name": "Message",
2877            "fields": [
2878                {
2879                    "name": "field_a",
2880                    "type": [
2881                        "null",
2882                        {
2883                            "name": "Inner",
2884                            "type": "record",
2885                            "fields": [
2886                                {
2887                                    "name": "inner_a",
2888                                    "type": "string"
2889                                }
2890                            ]
2891                        }
2892                    ],
2893                    "default": null
2894                },
2895                {
2896                    "name": "field_b",
2897                    "type": [
2898                        "null",
2899                        "Inner"
2900                    ],
2901                    "default": null
2902                }
2903            ]
2904        }"#;
2905
2906        #[derive(Serialize, Deserialize)]
2907        struct Inner {
2908            inner_a: String,
2909        }
2910
2911        #[derive(Serialize, Deserialize)]
2912        struct Message {
2913            field_a: Option<Inner>,
2914            field_b: Option<Inner>,
2915        }
2916
2917        let schema = Schema::parse_str(schema_str)?;
2918
2919        let msg = Message {
2920            field_a: Some(Inner {
2921                inner_a: "foo".to_string(),
2922            }),
2923            field_b: if set_field_b {
2924                Some(Inner {
2925                    inner_a: "bar".to_string(),
2926                })
2927            } else {
2928                None
2929            },
2930        };
2931
2932        let mut ser = Serializer::default();
2933        let test_value: Value = msg.serialize(&mut ser)?;
2934        assert!(test_value.validate(&schema), "test_value should validate");
2935        assert!(
2936            test_value.resolve(&schema).is_ok(),
2937            "test_value should resolve"
2938        );
2939
2940        Ok(())
2941    }
2942
2943    #[test]
2944    fn test_avro_3688_field_b_not_set() -> TestResult {
2945        avro_3688_schema_resolution_panic(false)
2946    }
2947
2948    #[test]
2949    fn test_avro_3688_field_b_set() -> TestResult {
2950        avro_3688_schema_resolution_panic(true)
2951    }
2952
2953    #[test]
2954    fn test_avro_3764_use_resolve_schemata() -> TestResult {
2955        let referenced_schema =
2956            r#"{"name": "enumForReference", "type": "enum", "symbols": ["A", "B"]}"#;
2957        let main_schema = r#"{"name": "recordWithReference", "type": "record", "fields": [{"name": "reference", "type": "enumForReference"}]}"#;
2958
2959        let value: serde_json::Value = serde_json::from_str(
2960            r#"
2961            {
2962                "reference": "A"
2963            }
2964        "#,
2965        )?;
2966
2967        let avro_value = Value::from(value);
2968
2969        let schemas = Schema::parse_list([main_schema, referenced_schema])?;
2970
2971        let main_schema = schemas.first().unwrap();
2972        let schemata: Vec<_> = schemas.iter().skip(1).collect();
2973
2974        let resolve_result = avro_value.clone().resolve_schemata(main_schema, schemata);
2975
2976        assert!(
2977            resolve_result.is_ok(),
2978            "result of resolving with schemata should be ok, got: {resolve_result:?}"
2979        );
2980
2981        let resolve_result = avro_value.resolve(main_schema);
2982        assert!(
2983            resolve_result.is_err(),
2984            "result of resolving without schemata should be err, got: {resolve_result:?}"
2985        );
2986
2987        Ok(())
2988    }
2989
2990    #[test]
2991    fn test_avro_3767_union_resolve_complex_refs() -> TestResult {
2992        let referenced_enum =
2993            r#"{"name": "enumForReference", "type": "enum", "symbols": ["A", "B"]}"#;
2994        let referenced_record = r#"{"name": "recordForReference", "type": "record", "fields": [{"name": "refInRecord", "type": "enumForReference"}]}"#;
2995        let main_schema = r#"{"name": "recordWithReference", "type": "record", "fields": [{"name": "reference", "type": ["null", "recordForReference"]}]}"#;
2996
2997        let value: serde_json::Value = serde_json::from_str(
2998            r#"
2999            {
3000                "reference": {
3001                    "refInRecord": "A"
3002                }
3003            }
3004        "#,
3005        )?;
3006
3007        let avro_value = Value::from(value);
3008
3009        let schemata = Schema::parse_list([referenced_enum, referenced_record, main_schema])?;
3010
3011        let main_schema = schemata.last().unwrap();
3012        let other_schemata: Vec<&Schema> = schemata.iter().take(2).collect();
3013
3014        let resolve_result = avro_value.resolve_schemata(main_schema, other_schemata);
3015
3016        assert!(
3017            resolve_result.is_ok(),
3018            "result of resolving with schemata should be ok, got: {resolve_result:?}"
3019        );
3020
3021        assert!(
3022            resolve_result?.validate_schemata(schemata.iter().collect()),
3023            "result of validation with schemata should be true"
3024        );
3025
3026        Ok(())
3027    }
3028
3029    #[test]
3030    fn test_avro_3782_incorrect_decimal_resolving() -> TestResult {
3031        let schema = r#"{"name": "decimalSchema", "logicalType": "decimal", "type": "fixed", "precision": 8, "scale": 0, "size": 8}"#;
3032
3033        let avro_value = Value::Decimal(Decimal::from(
3034            BigInt::from(12345678u32).to_signed_bytes_be(),
3035        ));
3036        let schema = Schema::parse_str(schema)?;
3037        let resolve_result = avro_value.resolve(&schema);
3038        assert!(
3039            resolve_result.is_ok(),
3040            "resolve result must be ok, got: {resolve_result:?}"
3041        );
3042
3043        Ok(())
3044    }
3045
3046    #[test]
3047    fn test_avro_3779_bigdecimal_resolving() -> TestResult {
3048        let schema =
3049            r#"{"name": "bigDecimalSchema", "logicalType": "big-decimal", "type": "bytes" }"#;
3050
3051        let avro_value = Value::BigDecimal(BigDecimal::from(12345678u32));
3052        let schema = Schema::parse_str(schema)?;
3053        let resolve_result: AvroResult<Value> = avro_value.resolve(&schema);
3054        assert!(
3055            resolve_result.is_ok(),
3056            "resolve result must be ok, got: {resolve_result:?}"
3057        );
3058
3059        Ok(())
3060    }
3061
3062    #[test]
3063    fn test_avro_3892_resolve_fixed_from_bytes() -> TestResult {
3064        let value = Value::Bytes(vec![97, 98, 99]);
3065        assert_eq!(
3066            value.resolve(&Schema::Fixed(FixedSchema {
3067                name: "test".into(),
3068                aliases: None,
3069                doc: None,
3070                size: 3,
3071                default: None,
3072                attributes: Default::default()
3073            }))?,
3074            Value::Fixed(3, vec![97, 98, 99])
3075        );
3076
3077        let value = Value::Bytes(vec![97, 99]);
3078        assert!(value
3079            .resolve(&Schema::Fixed(FixedSchema {
3080                name: "test".into(),
3081                aliases: None,
3082                doc: None,
3083                size: 3,
3084                default: None,
3085                attributes: Default::default()
3086            }))
3087            .is_err(),);
3088
3089        let value = Value::Bytes(vec![97, 98, 99, 100]);
3090        assert!(value
3091            .resolve(&Schema::Fixed(FixedSchema {
3092                name: "test".into(),
3093                aliases: None,
3094                doc: None,
3095                size: 3,
3096                default: None,
3097                attributes: Default::default()
3098            }))
3099            .is_err(),);
3100
3101        Ok(())
3102    }
3103
3104    #[test]
3105    fn avro_3928_from_serde_value_to_types_value() {
3106        assert_eq!(Value::from(serde_json::Value::Null), Value::Null);
3107        assert_eq!(Value::from(json!(true)), Value::Boolean(true));
3108        assert_eq!(Value::from(json!(false)), Value::Boolean(false));
3109        assert_eq!(Value::from(json!(0)), Value::Int(0));
3110        assert_eq!(Value::from(json!(i32::MIN)), Value::Int(i32::MIN));
3111        assert_eq!(Value::from(json!(i32::MAX)), Value::Int(i32::MAX));
3112        assert_eq!(
3113            Value::from(json!(i32::MIN as i64 - 1)),
3114            Value::Long(i32::MIN as i64 - 1)
3115        );
3116        assert_eq!(
3117            Value::from(json!(i32::MAX as i64 + 1)),
3118            Value::Long(i32::MAX as i64 + 1)
3119        );
3120        assert_eq!(Value::from(json!(1.23)), Value::Double(1.23));
3121        assert_eq!(Value::from(json!(-1.23)), Value::Double(-1.23));
3122        assert_eq!(Value::from(json!(u64::MIN)), Value::Int(u64::MIN as i32));
3123        assert_eq!(Value::from(json!(u64::MAX)), Value::Long(u64::MAX as i64));
3124        assert_eq!(
3125            Value::from(json!("some text")),
3126            Value::String("some text".into())
3127        );
3128        assert_eq!(
3129            Value::from(json!(["text1", "text2", "text3"])),
3130            Value::Array(vec![
3131                Value::String("text1".into()),
3132                Value::String("text2".into()),
3133                Value::String("text3".into())
3134            ])
3135        );
3136        assert_eq!(
3137            Value::from(json!({"key1": "value1", "key2": "value2"})),
3138            Value::Map(
3139                vec![
3140                    ("key1".into(), Value::String("value1".into())),
3141                    ("key2".into(), Value::String("value2".into()))
3142                ]
3143                .into_iter()
3144                .collect()
3145            )
3146        );
3147    }
3148
3149    #[test]
3150    fn avro_4024_resolve_double_from_unknown_string_err() -> TestResult {
3151        let schema = Schema::parse_str(r#"{"type": "double"}"#)?;
3152        let value = Value::String("unknown".to_owned());
3153        match value.resolve(&schema) {
3154            Err(err @ Error::GetDouble(_)) => {
3155                assert_eq!(
3156                    format!("{err:?}"),
3157                    r#"Expected Value::Double, Value::Float, Value::Int, Value::Long or Value::String ("NaN", "INF", "Infinity", "-INF" or "-Infinity"), got: String("unknown")"#
3158                );
3159            }
3160            other => {
3161                panic!("Expected Error::GetDouble, got {other:?}");
3162            }
3163        }
3164        Ok(())
3165    }
3166
3167    #[test]
3168    fn avro_4024_resolve_float_from_unknown_string_err() -> TestResult {
3169        let schema = Schema::parse_str(r#"{"type": "float"}"#)?;
3170        let value = Value::String("unknown".to_owned());
3171        match value.resolve(&schema) {
3172            Err(err @ Error::GetFloat(_)) => {
3173                assert_eq!(
3174                    format!("{err:?}"),
3175                    r#"Expected Value::Float, Value::Double, Value::Int, Value::Long or Value::String ("NaN", "INF", "Infinity", "-INF" or "-Infinity"), got: String("unknown")"#
3176                );
3177            }
3178            other => {
3179                panic!("Expected Error::GetFloat, got {other:?}");
3180            }
3181        }
3182        Ok(())
3183    }
3184
3185    #[test]
3186    fn avro_4029_resolve_from_unsupported_err() -> TestResult {
3187        let data: Vec<(&str, Value, &str)> = vec!(
3188            (r#"{ "name": "NAME", "type": "int" }"#, Value::Float(123_f32), "Expected Value::Int, got: Float(123.0)"),
3189            (r#"{ "name": "NAME", "type": "fixed", "size": 3 }"#, Value::Float(123_f32), "String expected for fixed, got: Float(123.0)"),
3190            (r#"{ "name": "NAME", "type": "bytes" }"#, Value::Float(123_f32), "Expected Value::Bytes, got: Float(123.0)"),
3191            (r#"{ "name": "NAME", "type": "string", "logicalType": "uuid" }"#, Value::String("abc-1234".into()), "Failed to convert &str to UUID: invalid group count: expected 5, found 2"),
3192            (r#"{ "name": "NAME", "type": "string", "logicalType": "uuid" }"#, Value::Float(123_f32), "Expected Value::Uuid, got: Float(123.0)"),
3193            (r#"{ "name": "NAME", "type": "bytes", "logicalType": "big-decimal" }"#, Value::Float(123_f32), "Expected Value::BigDecimal, got: Float(123.0)"),
3194            (r#"{ "name": "NAME", "type": "fixed", "size": 12, "logicalType": "duration" }"#, Value::Float(123_f32), "Expected Value::Duration or Value::Fixed(12), got: Float(123.0)"),
3195            (r#"{ "name": "NAME", "type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3 }"#, Value::Float(123_f32), "Expected Value::Decimal, Value::Bytes or Value::Fixed, got: Float(123.0)"),
3196            (r#"{ "name": "NAME", "type": "bytes" }"#, Value::Array(vec!(Value::Long(256_i64))), "Unable to convert to u8, got Int(256)"),
3197            (r#"{ "name": "NAME", "type": "int", "logicalType": "date" }"#, Value::Float(123_f32), "Expected Value::Date or Value::Int, got: Float(123.0)"),
3198            (r#"{ "name": "NAME", "type": "int", "logicalType": "time-millis" }"#, Value::Float(123_f32), "Expected Value::TimeMillis or Value::Int, got: Float(123.0)"),
3199            (r#"{ "name": "NAME", "type": "long", "logicalType": "time-micros" }"#, Value::Float(123_f32), "Expected Value::TimeMicros, Value::Long or Value::Int, got: Float(123.0)"),
3200            (r#"{ "name": "NAME", "type": "long", "logicalType": "timestamp-millis" }"#, Value::Float(123_f32), "Expected Value::TimestampMillis, Value::Long or Value::Int, got: Float(123.0)"),
3201            (r#"{ "name": "NAME", "type": "long", "logicalType": "timestamp-micros" }"#, Value::Float(123_f32), "Expected Value::TimestampMicros, Value::Long or Value::Int, got: Float(123.0)"),
3202            (r#"{ "name": "NAME", "type": "long", "logicalType": "timestamp-nanos" }"#, Value::Float(123_f32), "Expected Value::TimestampNanos, Value::Long or Value::Int, got: Float(123.0)"),
3203            (r#"{ "name": "NAME", "type": "long", "logicalType": "local-timestamp-millis" }"#, Value::Float(123_f32), "Expected Value::LocalTimestampMillis, Value::Long or Value::Int, got: Float(123.0)"),
3204            (r#"{ "name": "NAME", "type": "long", "logicalType": "local-timestamp-micros" }"#, Value::Float(123_f32), "Expected Value::LocalTimestampMicros, Value::Long or Value::Int, got: Float(123.0)"),
3205            (r#"{ "name": "NAME", "type": "long", "logicalType": "local-timestamp-nanos" }"#, Value::Float(123_f32), "Expected Value::LocalTimestampNanos, Value::Long or Value::Int, got: Float(123.0)"),
3206            (r#"{ "name": "NAME", "type": "null" }"#, Value::Float(123_f32), "Expected Value::Null, got: Float(123.0)"),
3207            (r#"{ "name": "NAME", "type": "boolean" }"#, Value::Float(123_f32), "Expected Value::Boolean, got: Float(123.0)"),
3208            (r#"{ "name": "NAME", "type": "int" }"#, Value::Float(123_f32), "Expected Value::Int, got: Float(123.0)"),
3209            (r#"{ "name": "NAME", "type": "long" }"#, Value::Float(123_f32), "Expected Value::Long or Value::Int, got: Float(123.0)"),
3210            (r#"{ "name": "NAME", "type": "float" }"#, Value::Boolean(false), r#"Expected Value::Float, Value::Double, Value::Int, Value::Long or Value::String ("NaN", "INF", "Infinity", "-INF" or "-Infinity"), got: Boolean(false)"#),
3211            (r#"{ "name": "NAME", "type": "double" }"#, Value::Boolean(false), r#"Expected Value::Double, Value::Float, Value::Int, Value::Long or Value::String ("NaN", "INF", "Infinity", "-INF" or "-Infinity"), got: Boolean(false)"#),
3212            (r#"{ "name": "NAME", "type": "string" }"#, Value::Boolean(false), "Expected Value::String, Value::Bytes or Value::Fixed, got: Boolean(false)"),
3213            (r#"{ "name": "NAME", "type": "enum", "symbols": ["one", "two"] }"#, Value::Boolean(false), "Expected Value::Enum, got: Boolean(false)"),
3214        );
3215
3216        for (schema_str, value, expected_error) in data {
3217            let schema = Schema::parse_str(schema_str)?;
3218            match value.resolve(&schema) {
3219                Err(error) => {
3220                    assert_eq!(format!("{error}"), expected_error);
3221                }
3222                other => {
3223                    panic!("Expected '{expected_error}', got {other:?}");
3224                }
3225            }
3226        }
3227        Ok(())
3228    }
3229
3230    #[test]
3231    fn avro_rs_130_get_from_record() -> TestResult {
3232        let schema = r#"
3233        {
3234            "type": "record",
3235            "name": "NamespacedMessage",
3236            "namespace": "space",
3237            "fields": [
3238                {
3239                    "name": "foo",
3240                    "type": "string"
3241                },
3242                {
3243                    "name": "bar",
3244                    "type": "long"
3245                }
3246            ]
3247        }
3248        "#;
3249
3250        let schema = Schema::parse_str(schema)?;
3251        let mut record = Record::new(&schema).unwrap();
3252        record.put("foo", "hello");
3253        record.put("bar", 123_i64);
3254
3255        assert_eq!(
3256            record.get("foo").unwrap(),
3257            &Value::String("hello".to_string())
3258        );
3259        assert_eq!(record.get("bar").unwrap(), &Value::Long(123));
3260
3261        // also make sure it doesn't fail but return None for non-existing field
3262        assert_eq!(record.get("baz"), None);
3263
3264        Ok(())
3265    }
3266}