apache_avro/
types.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic handling the intermediate representation of Avro values.
19use crate::{
20    bigdecimal::{deserialize_big_decimal, serialize_big_decimal},
21    decimal::Decimal,
22    duration::Duration,
23    schema::{
24        DecimalSchema, EnumSchema, FixedSchema, Name, Namespace, Precision, RecordField,
25        RecordSchema, ResolvedSchema, Scale, Schema, SchemaKind, UnionSchema,
26    },
27    AvroResult, Error,
28};
29use bigdecimal::BigDecimal;
30use serde_json::{Number, Value as JsonValue};
31use std::{
32    borrow::Borrow,
33    collections::{BTreeMap, HashMap},
34    fmt::Debug,
35    hash::BuildHasher,
36    str::FromStr,
37};
38use uuid::Uuid;
39
40/// Compute the maximum decimal value precision of a byte array of length `len` could hold.
41fn max_prec_for_len(len: usize) -> Result<usize, Error> {
42    let len = i32::try_from(len).map_err(|e| Error::ConvertLengthToI32(e, len))?;
43    Ok((2.0_f64.powi(8 * len - 1) - 1.0).log10().floor() as usize)
44}
45
46/// A valid Avro value.
47///
48/// More information about Avro values can be found in the [Avro
49/// Specification](https://avro.apache.org/docs/current/spec.html#schemas)
50#[derive(Clone, Debug, PartialEq, strum_macros::EnumDiscriminants)]
51#[strum_discriminants(name(ValueKind))]
52pub enum Value {
53    /// A `null` Avro value.
54    Null,
55    /// A `boolean` Avro value.
56    Boolean(bool),
57    /// A `int` Avro value.
58    Int(i32),
59    /// A `long` Avro value.
60    Long(i64),
61    /// A `float` Avro value.
62    Float(f32),
63    /// A `double` Avro value.
64    Double(f64),
65    /// A `bytes` Avro value.
66    Bytes(Vec<u8>),
67    /// A `string` Avro value.
68    String(String),
69    /// A `fixed` Avro value.
70    /// The size of the fixed value is represented as a `usize`.
71    Fixed(usize, Vec<u8>),
72    /// An `enum` Avro value.
73    ///
74    /// An Enum is represented by a symbol and its position in the symbols list
75    /// of its corresponding schema.
76    /// This allows schema-less encoding, as well as schema resolution while
77    /// reading values.
78    Enum(u32, String),
79    /// An `union` Avro value.
80    ///
81    /// A Union is represented by the value it holds and its position in the type list
82    /// of its corresponding schema
83    /// This allows schema-less encoding, as well as schema resolution while
84    /// reading values.
85    Union(u32, Box<Value>),
86    /// An `array` Avro value.
87    Array(Vec<Value>),
88    /// A `map` Avro value.
89    Map(HashMap<String, Value>),
90    /// A `record` Avro value.
91    ///
92    /// A Record is represented by a vector of (`<record name>`, `value`).
93    /// This allows schema-less encoding.
94    ///
95    /// See [Record](types.Record) for a more user-friendly support.
96    Record(Vec<(String, Value)>),
97    /// A date value.
98    ///
99    /// Serialized and deserialized as `i32` directly. Can only be deserialized properly with a
100    /// schema.
101    Date(i32),
102    /// An Avro Decimal value. Bytes are in big-endian order, per the Avro spec.
103    Decimal(Decimal),
104    /// An Avro Decimal value.
105    BigDecimal(BigDecimal),
106    /// Time in milliseconds.
107    TimeMillis(i32),
108    /// Time in microseconds.
109    TimeMicros(i64),
110    /// Timestamp in milliseconds.
111    TimestampMillis(i64),
112    /// Timestamp in microseconds.
113    TimestampMicros(i64),
114    /// Timestamp in nanoseconds.
115    TimestampNanos(i64),
116    /// Local timestamp in milliseconds.
117    LocalTimestampMillis(i64),
118    /// Local timestamp in microseconds.
119    LocalTimestampMicros(i64),
120    /// Local timestamp in nanoseconds.
121    LocalTimestampNanos(i64),
122    /// Avro Duration. An amount of time defined by months, days and milliseconds.
123    Duration(Duration),
124    /// Universally unique identifier.
125    Uuid(Uuid),
126}
127
128/// Any structure implementing the [ToAvro](trait.ToAvro.html) trait will be usable
129/// from a [Writer](../writer/struct.Writer.html).
130#[deprecated(
131    since = "0.11.0",
132    note = "Please use Value::from, Into::into or value.into() instead"
133)]
134pub trait ToAvro {
135    /// Transforms this value into an Avro-compatible [Value](enum.Value.html).
136    fn avro(self) -> Value;
137}
138
139#[allow(deprecated)]
140impl<T: Into<Value>> ToAvro for T {
141    fn avro(self) -> Value {
142        self.into()
143    }
144}
145
146macro_rules! to_value(
147    ($type:ty, $variant_constructor:expr) => (
148        impl From<$type> for Value {
149            fn from(value: $type) -> Self {
150                $variant_constructor(value)
151            }
152        }
153    );
154);
155
156to_value!(bool, Value::Boolean);
157to_value!(i32, Value::Int);
158to_value!(i64, Value::Long);
159to_value!(f32, Value::Float);
160to_value!(f64, Value::Double);
161to_value!(String, Value::String);
162to_value!(Vec<u8>, Value::Bytes);
163to_value!(uuid::Uuid, Value::Uuid);
164to_value!(Decimal, Value::Decimal);
165to_value!(BigDecimal, Value::BigDecimal);
166to_value!(Duration, Value::Duration);
167
168impl From<()> for Value {
169    fn from(_: ()) -> Self {
170        Self::Null
171    }
172}
173
174impl From<usize> for Value {
175    fn from(value: usize) -> Self {
176        i64::try_from(value)
177            .expect("cannot convert usize to i64")
178            .into()
179    }
180}
181
182impl From<&str> for Value {
183    fn from(value: &str) -> Self {
184        Self::String(value.to_owned())
185    }
186}
187
188impl From<&[u8]> for Value {
189    fn from(value: &[u8]) -> Self {
190        Self::Bytes(value.to_owned())
191    }
192}
193
194impl<T> From<Option<T>> for Value
195where
196    T: Into<Self>,
197{
198    fn from(value: Option<T>) -> Self {
199        // FIXME: this is incorrect in case first type in union is not "none"
200        Self::Union(
201            value.is_some() as u32,
202            Box::new(value.map_or_else(|| Self::Null, Into::into)),
203        )
204    }
205}
206
207impl<K, V, S> From<HashMap<K, V, S>> for Value
208where
209    K: Into<String>,
210    V: Into<Self>,
211    S: BuildHasher,
212{
213    fn from(value: HashMap<K, V, S>) -> Self {
214        Self::Map(
215            value
216                .into_iter()
217                .map(|(key, value)| (key.into(), value.into()))
218                .collect(),
219        )
220    }
221}
222
223/// Utility interface to build `Value::Record` objects.
224#[derive(Debug, Clone)]
225pub struct Record<'a> {
226    /// List of fields contained in the record.
227    /// Ordered according to the fields in the schema given to create this
228    /// `Record` object. Any unset field defaults to `Value::Null`.
229    pub fields: Vec<(String, Value)>,
230    schema_lookup: &'a BTreeMap<String, usize>,
231}
232
233impl<'a> Record<'a> {
234    /// Create a `Record` given a `Schema`.
235    ///
236    /// If the `Schema` is not a `Schema::Record` variant, `None` will be returned.
237    pub fn new(schema: &Schema) -> Option<Record> {
238        match *schema {
239            Schema::Record(RecordSchema {
240                fields: ref schema_fields,
241                lookup: ref schema_lookup,
242                ..
243            }) => {
244                let mut fields = Vec::with_capacity(schema_fields.len());
245                for schema_field in schema_fields.iter() {
246                    fields.push((schema_field.name.clone(), Value::Null));
247                }
248
249                Some(Record {
250                    fields,
251                    schema_lookup,
252                })
253            }
254            _ => None,
255        }
256    }
257
258    /// Put a compatible value (implementing the `ToAvro` trait) in the
259    /// `Record` for a given `field` name.
260    ///
261    /// **NOTE** Only ensure that the field name is present in the `Schema` given when creating
262    /// this `Record`. Does not perform any schema validation.
263    pub fn put<V>(&mut self, field: &str, value: V)
264    where
265        V: Into<Value>,
266    {
267        if let Some(&position) = self.schema_lookup.get(field) {
268            self.fields[position].1 = value.into()
269        }
270    }
271}
272
273impl<'a> From<Record<'a>> for Value {
274    fn from(value: Record<'a>) -> Self {
275        Self::Record(value.fields)
276    }
277}
278
279impl From<JsonValue> for Value {
280    fn from(value: JsonValue) -> Self {
281        match value {
282            JsonValue::Null => Self::Null,
283            JsonValue::Bool(b) => b.into(),
284            JsonValue::Number(ref n) if n.is_i64() => {
285                let n = n.as_i64().unwrap();
286                if n >= i32::MIN as i64 && n <= i32::MAX as i64 {
287                    Value::Int(n as i32)
288                } else {
289                    Value::Long(n)
290                }
291            }
292            JsonValue::Number(ref n) if n.is_f64() => Value::Double(n.as_f64().unwrap()),
293            JsonValue::Number(n) => Value::Long(n.as_u64().unwrap() as i64), // TODO: Not so great
294            JsonValue::String(s) => s.into(),
295            JsonValue::Array(items) => Value::Array(items.into_iter().map(Value::from).collect()),
296            JsonValue::Object(items) => Value::Map(
297                items
298                    .into_iter()
299                    .map(|(key, value)| (key, value.into()))
300                    .collect(),
301            ),
302        }
303    }
304}
305
306/// Convert Avro values to Json values
307impl TryFrom<Value> for JsonValue {
308    type Error = crate::error::Error;
309    fn try_from(value: Value) -> AvroResult<Self> {
310        match value {
311            Value::Null => Ok(Self::Null),
312            Value::Boolean(b) => Ok(Self::Bool(b)),
313            Value::Int(i) => Ok(Self::Number(i.into())),
314            Value::Long(l) => Ok(Self::Number(l.into())),
315            Value::Float(f) => Number::from_f64(f.into())
316                .map(Self::Number)
317                .ok_or_else(|| Error::ConvertF64ToJson(f.into())),
318            Value::Double(d) => Number::from_f64(d)
319                .map(Self::Number)
320                .ok_or(Error::ConvertF64ToJson(d)),
321            Value::Bytes(bytes) => Ok(Self::Array(bytes.into_iter().map(|b| b.into()).collect())),
322            Value::String(s) => Ok(Self::String(s)),
323            Value::Fixed(_size, items) => {
324                Ok(Self::Array(items.into_iter().map(|v| v.into()).collect()))
325            }
326            Value::Enum(_i, s) => Ok(Self::String(s)),
327            Value::Union(_i, b) => Self::try_from(*b),
328            Value::Array(items) => items
329                .into_iter()
330                .map(Self::try_from)
331                .collect::<Result<Vec<_>, _>>()
332                .map(Self::Array),
333            Value::Map(items) => items
334                .into_iter()
335                .map(|(key, value)| Self::try_from(value).map(|v| (key, v)))
336                .collect::<Result<Vec<_>, _>>()
337                .map(|v| Self::Object(v.into_iter().collect())),
338            Value::Record(items) => items
339                .into_iter()
340                .map(|(key, value)| Self::try_from(value).map(|v| (key, v)))
341                .collect::<Result<Vec<_>, _>>()
342                .map(|v| Self::Object(v.into_iter().collect())),
343            Value::Date(d) => Ok(Self::Number(d.into())),
344            Value::Decimal(ref d) => <Vec<u8>>::try_from(d)
345                .map(|vec| Self::Array(vec.into_iter().map(|v| v.into()).collect())),
346            Value::BigDecimal(ref bg) => {
347                let vec1: Vec<u8> = serialize_big_decimal(bg);
348                Ok(Self::Array(vec1.into_iter().map(|b| b.into()).collect()))
349            }
350            Value::TimeMillis(t) => Ok(Self::Number(t.into())),
351            Value::TimeMicros(t) => Ok(Self::Number(t.into())),
352            Value::TimestampMillis(t) => Ok(Self::Number(t.into())),
353            Value::TimestampMicros(t) => Ok(Self::Number(t.into())),
354            Value::TimestampNanos(t) => Ok(Self::Number(t.into())),
355            Value::LocalTimestampMillis(t) => Ok(Self::Number(t.into())),
356            Value::LocalTimestampMicros(t) => Ok(Self::Number(t.into())),
357            Value::LocalTimestampNanos(t) => Ok(Self::Number(t.into())),
358            Value::Duration(d) => Ok(Self::Array(
359                <[u8; 12]>::from(d).iter().map(|&v| v.into()).collect(),
360            )),
361            Value::Uuid(uuid) => Ok(Self::String(uuid.as_hyphenated().to_string())),
362        }
363    }
364}
365
366impl Value {
367    /// Validate the value against the given [Schema](../schema/enum.Schema.html).
368    ///
369    /// See the [Avro specification](https://avro.apache.org/docs/current/spec.html)
370    /// for the full set of rules of schema validation.
371    pub fn validate(&self, schema: &Schema) -> bool {
372        self.validate_schemata(vec![schema])
373    }
374
375    pub fn validate_schemata(&self, schemata: Vec<&Schema>) -> bool {
376        let rs = ResolvedSchema::try_from(schemata.clone())
377            .expect("Schemata didn't successfully resolve");
378        let schemata_len = schemata.len();
379        schemata.iter().any(|schema| {
380            let enclosing_namespace = schema.namespace();
381
382            match self.validate_internal(schema, rs.get_names(), &enclosing_namespace) {
383                Some(reason) => {
384                    let log_message = format!(
385                        "Invalid value: {:?} for schema: {:?}. Reason: {}",
386                        self, schema, reason
387                    );
388                    if schemata_len == 1 {
389                        error!("{}", log_message);
390                    } else {
391                        debug!("{}", log_message);
392                    };
393                    false
394                }
395                None => true,
396            }
397        })
398    }
399
400    fn accumulate(accumulator: Option<String>, other: Option<String>) -> Option<String> {
401        match (accumulator, other) {
402            (None, None) => None,
403            (None, s @ Some(_)) => s,
404            (s @ Some(_), None) => s,
405            (Some(reason1), Some(reason2)) => Some(format!("{reason1}\n{reason2}")),
406        }
407    }
408
409    /// Validates the value against the provided schema.
410    pub(crate) fn validate_internal<S: std::borrow::Borrow<Schema> + Debug>(
411        &self,
412        schema: &Schema,
413        names: &HashMap<Name, S>,
414        enclosing_namespace: &Namespace,
415    ) -> Option<String> {
416        match (self, schema) {
417            (_, Schema::Ref { name }) => {
418                let name = name.fully_qualified_name(enclosing_namespace);
419                names.get(&name).map_or_else(
420                    || {
421                        Some(format!(
422                            "Unresolved schema reference: '{:?}'. Parsed names: {:?}",
423                            name,
424                            names.keys()
425                        ))
426                    },
427                    |s| self.validate_internal(s.borrow(), names, &name.namespace),
428                )
429            }
430            (&Value::Null, &Schema::Null) => None,
431            (&Value::Boolean(_), &Schema::Boolean) => None,
432            (&Value::Int(_), &Schema::Int) => None,
433            (&Value::Int(_), &Schema::Date) => None,
434            (&Value::Int(_), &Schema::TimeMillis) => None,
435            (&Value::Int(_), &Schema::Long) => None,
436            (&Value::Long(_), &Schema::Long) => None,
437            (&Value::Long(_), &Schema::TimeMicros) => None,
438            (&Value::Long(_), &Schema::TimestampMillis) => None,
439            (&Value::Long(_), &Schema::TimestampMicros) => None,
440            (&Value::Long(_), &Schema::LocalTimestampMillis) => None,
441            (&Value::Long(_), &Schema::LocalTimestampMicros) => None,
442            (&Value::TimestampMicros(_), &Schema::TimestampMicros) => None,
443            (&Value::TimestampMillis(_), &Schema::TimestampMillis) => None,
444            (&Value::TimestampNanos(_), &Schema::TimestampNanos) => None,
445            (&Value::LocalTimestampMicros(_), &Schema::LocalTimestampMicros) => None,
446            (&Value::LocalTimestampMillis(_), &Schema::LocalTimestampMillis) => None,
447            (&Value::LocalTimestampNanos(_), &Schema::LocalTimestampNanos) => None,
448            (&Value::TimeMicros(_), &Schema::TimeMicros) => None,
449            (&Value::TimeMillis(_), &Schema::TimeMillis) => None,
450            (&Value::Date(_), &Schema::Date) => None,
451            (&Value::Decimal(_), &Schema::Decimal { .. }) => None,
452            (&Value::BigDecimal(_), &Schema::BigDecimal) => None,
453            (&Value::Duration(_), &Schema::Duration) => None,
454            (&Value::Uuid(_), &Schema::Uuid) => None,
455            (&Value::Float(_), &Schema::Float) => None,
456            (&Value::Float(_), &Schema::Double) => None,
457            (&Value::Double(_), &Schema::Double) => None,
458            (&Value::Bytes(_), &Schema::Bytes) => None,
459            (&Value::Bytes(_), &Schema::Decimal { .. }) => None,
460            (&Value::String(_), &Schema::String) => None,
461            (&Value::String(_), &Schema::Uuid) => None,
462            (&Value::Fixed(n, _), &Schema::Fixed(FixedSchema { size, .. })) => {
463                if n != size {
464                    Some(format!(
465                        "The value's size ({n}) is different than the schema's size ({size})"
466                    ))
467                } else {
468                    None
469                }
470            }
471            (Value::Bytes(b), &Schema::Fixed(FixedSchema { size, .. })) => {
472                if b.len() != size {
473                    Some(format!(
474                        "The bytes' length ({}) is different than the schema's size ({})",
475                        b.len(),
476                        size
477                    ))
478                } else {
479                    None
480                }
481            }
482            (&Value::Fixed(n, _), &Schema::Duration) => {
483                if n != 12 {
484                    Some(format!(
485                        "The value's size ('{n}') must be exactly 12 to be a Duration"
486                    ))
487                } else {
488                    None
489                }
490            }
491            // TODO: check precision against n
492            (&Value::Fixed(_n, _), &Schema::Decimal { .. }) => None,
493            (Value::String(s), Schema::Enum(EnumSchema { symbols, .. })) => {
494                if !symbols.contains(s) {
495                    Some(format!("'{s}' is not a member of the possible symbols"))
496                } else {
497                    None
498                }
499            }
500            (
501                &Value::Enum(i, ref s),
502                Schema::Enum(EnumSchema {
503                    symbols, default, ..
504                }),
505            ) => symbols
506                .get(i as usize)
507                .map(|ref symbol| {
508                    if symbol != &s {
509                        Some(format!("Symbol '{s}' is not at position '{i}'"))
510                    } else {
511                        None
512                    }
513                })
514                .unwrap_or_else(|| match default {
515                    Some(_) => None,
516                    None => Some(format!("No symbol at position '{i}'")),
517                }),
518            // (&Value::Union(None), &Schema::Union(_)) => None,
519            (&Value::Union(i, ref value), Schema::Union(inner)) => inner
520                .variants()
521                .get(i as usize)
522                .map(|schema| value.validate_internal(schema, names, enclosing_namespace))
523                .unwrap_or_else(|| Some(format!("No schema in the union at position '{i}'"))),
524            (v, Schema::Union(inner)) => {
525                match inner.find_schema_with_known_schemata(v, Some(names), enclosing_namespace) {
526                    Some(_) => None,
527                    None => Some("Could not find matching type in union".to_string()),
528                }
529            }
530            (Value::Array(items), Schema::Array(inner)) => items.iter().fold(None, |acc, item| {
531                Value::accumulate(
532                    acc,
533                    item.validate_internal(&inner.items, names, enclosing_namespace),
534                )
535            }),
536            (Value::Map(items), Schema::Map(inner)) => {
537                items.iter().fold(None, |acc, (_, value)| {
538                    Value::accumulate(
539                        acc,
540                        value.validate_internal(&inner.types, names, enclosing_namespace),
541                    )
542                })
543            }
544            (
545                Value::Record(record_fields),
546                Schema::Record(RecordSchema {
547                    fields,
548                    lookup,
549                    name,
550                    ..
551                }),
552            ) => {
553                let non_nullable_fields_count =
554                    fields.iter().filter(|&rf| !rf.is_nullable()).count();
555
556                // If the record contains fewer fields as required fields by the schema, it is invalid.
557                if record_fields.len() < non_nullable_fields_count {
558                    return Some(format!(
559                        "The value's records length ({}) doesn't match the schema ({} non-nullable fields)",
560                        record_fields.len(),
561                        non_nullable_fields_count
562                    ));
563                } else if record_fields.len() > fields.len() {
564                    return Some(format!(
565                        "The value's records length ({}) is greater than the schema's ({} fields)",
566                        record_fields.len(),
567                        fields.len(),
568                    ));
569                }
570
571                record_fields
572                    .iter()
573                    .fold(None, |acc, (field_name, record_field)| {
574                        let record_namespace = if name.namespace.is_none() {
575                            enclosing_namespace
576                        } else {
577                            &name.namespace
578                        };
579                        match lookup.get(field_name) {
580                            Some(idx) => {
581                                let field = &fields[*idx];
582                                Value::accumulate(
583                                    acc,
584                                    record_field.validate_internal(
585                                        &field.schema,
586                                        names,
587                                        record_namespace,
588                                    ),
589                                )
590                            }
591                            None => Value::accumulate(
592                                acc,
593                                Some(format!("There is no schema field for field '{field_name}'")),
594                            ),
595                        }
596                    })
597            }
598            (Value::Map(items), Schema::Record(RecordSchema { fields, .. })) => {
599                fields.iter().fold(None, |acc, field| {
600                    if let Some(item) = items.get(&field.name) {
601                        let res = item.validate_internal(&field.schema, names, enclosing_namespace);
602                        Value::accumulate(acc, res)
603                    } else if !field.is_nullable() {
604                        Value::accumulate(
605                            acc,
606                            Some(format!(
607                                "Field with name '{:?}' is not a member of the map items",
608                                field.name
609                            )),
610                        )
611                    } else {
612                        acc
613                    }
614                })
615            }
616            (v, s) => Some(format!(
617                "Unsupported value-schema combination! Value: {:?}, schema: {:?}",
618                v, s
619            )),
620        }
621    }
622
623    /// Attempt to perform schema resolution on the value, with the given
624    /// [Schema](../schema/enum.Schema.html).
625    ///
626    /// See [Schema Resolution](https://avro.apache.org/docs/current/spec.html#Schema+Resolution)
627    /// in the Avro specification for the full set of rules of schema
628    /// resolution.
629    pub fn resolve(self, schema: &Schema) -> AvroResult<Self> {
630        let enclosing_namespace = schema.namespace();
631        let rs = ResolvedSchema::try_from(schema)?;
632        self.resolve_internal(schema, rs.get_names(), &enclosing_namespace, &None)
633    }
634
635    /// Attempt to perform schema resolution on the value, with the given
636    /// [Schema](../schema/enum.Schema.html) and set of schemas to use for Refs resolution.
637    ///
638    /// See [Schema Resolution](https://avro.apache.org/docs/current/spec.html#Schema+Resolution)
639    /// in the Avro specification for the full set of rules of schema
640    /// resolution.
641    pub fn resolve_schemata(self, schema: &Schema, schemata: Vec<&Schema>) -> AvroResult<Self> {
642        let enclosing_namespace = schema.namespace();
643        let rs = ResolvedSchema::try_from(schemata)?;
644        self.resolve_internal(schema, rs.get_names(), &enclosing_namespace, &None)
645    }
646
647    pub(crate) fn resolve_internal<S: Borrow<Schema> + Debug>(
648        mut self,
649        schema: &Schema,
650        names: &HashMap<Name, S>,
651        enclosing_namespace: &Namespace,
652        field_default: &Option<JsonValue>,
653    ) -> AvroResult<Self> {
654        // Check if this schema is a union, and if the reader schema is not.
655        if SchemaKind::from(&self) == SchemaKind::Union
656            && SchemaKind::from(schema) != SchemaKind::Union
657        {
658            // Pull out the Union, and attempt to resolve against it.
659            let v = match self {
660                Value::Union(_i, b) => *b,
661                _ => unreachable!(),
662            };
663            self = v;
664        }
665        match *schema {
666            Schema::Ref { ref name } => {
667                let name = name.fully_qualified_name(enclosing_namespace);
668
669                if let Some(resolved) = names.get(&name) {
670                    debug!("Resolved {:?}", name);
671                    self.resolve_internal(resolved.borrow(), names, &name.namespace, field_default)
672                } else {
673                    error!("Failed to resolve schema {:?}", name);
674                    Err(Error::SchemaResolutionError(name.clone()))
675                }
676            }
677            Schema::Null => self.resolve_null(),
678            Schema::Boolean => self.resolve_boolean(),
679            Schema::Int => self.resolve_int(),
680            Schema::Long => self.resolve_long(),
681            Schema::Float => self.resolve_float(),
682            Schema::Double => self.resolve_double(),
683            Schema::Bytes => self.resolve_bytes(),
684            Schema::String => self.resolve_string(),
685            Schema::Fixed(FixedSchema { size, .. }) => self.resolve_fixed(size),
686            Schema::Union(ref inner) => {
687                self.resolve_union(inner, names, enclosing_namespace, field_default)
688            }
689            Schema::Enum(EnumSchema {
690                ref symbols,
691                ref default,
692                ..
693            }) => self.resolve_enum(symbols, default, field_default),
694            Schema::Array(ref inner) => {
695                self.resolve_array(&inner.items, names, enclosing_namespace)
696            }
697            Schema::Map(ref inner) => self.resolve_map(&inner.types, names, enclosing_namespace),
698            Schema::Record(RecordSchema { ref fields, .. }) => {
699                self.resolve_record(fields, names, enclosing_namespace)
700            }
701            Schema::Decimal(DecimalSchema {
702                scale,
703                precision,
704                ref inner,
705            }) => self.resolve_decimal(precision, scale, inner),
706            Schema::BigDecimal => self.resolve_bigdecimal(),
707            Schema::Date => self.resolve_date(),
708            Schema::TimeMillis => self.resolve_time_millis(),
709            Schema::TimeMicros => self.resolve_time_micros(),
710            Schema::TimestampMillis => self.resolve_timestamp_millis(),
711            Schema::TimestampMicros => self.resolve_timestamp_micros(),
712            Schema::TimestampNanos => self.resolve_timestamp_nanos(),
713            Schema::LocalTimestampMillis => self.resolve_local_timestamp_millis(),
714            Schema::LocalTimestampMicros => self.resolve_local_timestamp_micros(),
715            Schema::LocalTimestampNanos => self.resolve_local_timestamp_nanos(),
716            Schema::Duration => self.resolve_duration(),
717            Schema::Uuid => self.resolve_uuid(),
718        }
719    }
720
721    fn resolve_uuid(self) -> Result<Self, Error> {
722        Ok(match self {
723            uuid @ Value::Uuid(_) => uuid,
724            Value::String(ref string) => {
725                Value::Uuid(Uuid::from_str(string).map_err(Error::ConvertStrToUuid)?)
726            }
727            other => return Err(Error::GetUuid(other.into())),
728        })
729    }
730
731    fn resolve_bigdecimal(self) -> Result<Self, Error> {
732        Ok(match self {
733            bg @ Value::BigDecimal(_) => bg,
734            Value::Bytes(b) => Value::BigDecimal(deserialize_big_decimal(&b).unwrap()),
735            other => return Err(Error::GetBigdecimal(other.into())),
736        })
737    }
738
739    fn resolve_duration(self) -> Result<Self, Error> {
740        Ok(match self {
741            duration @ Value::Duration { .. } => duration,
742            Value::Fixed(size, bytes) => {
743                if size != 12 {
744                    return Err(Error::GetDecimalFixedBytes(size));
745                }
746                Value::Duration(Duration::from([
747                    bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
748                    bytes[8], bytes[9], bytes[10], bytes[11],
749                ]))
750            }
751            other => return Err(Error::ResolveDuration(other.into())),
752        })
753    }
754
755    fn resolve_decimal(
756        self,
757        precision: Precision,
758        scale: Scale,
759        inner: &Schema,
760    ) -> Result<Self, Error> {
761        if scale > precision {
762            return Err(Error::GetScaleAndPrecision { scale, precision });
763        }
764        match inner {
765            &Schema::Fixed(FixedSchema { size, .. }) => {
766                if max_prec_for_len(size)? < precision {
767                    return Err(Error::GetScaleWithFixedSize { size, precision });
768                }
769            }
770            Schema::Bytes => (),
771            _ => return Err(Error::ResolveDecimalSchema(inner.into())),
772        };
773        match self {
774            Value::Decimal(num) => {
775                let num_bytes = num.len();
776                if max_prec_for_len(num_bytes)? < precision {
777                    Err(Error::ComparePrecisionAndSize {
778                        precision,
779                        num_bytes,
780                    })
781                } else {
782                    Ok(Value::Decimal(num))
783                }
784                // check num.bits() here
785            }
786            Value::Fixed(_, bytes) | Value::Bytes(bytes) => {
787                if max_prec_for_len(bytes.len())? < precision {
788                    Err(Error::ComparePrecisionAndSize {
789                        precision,
790                        num_bytes: bytes.len(),
791                    })
792                } else {
793                    // precision and scale match, can we assume the underlying type can hold the data?
794                    Ok(Value::Decimal(Decimal::from(bytes)))
795                }
796            }
797            other => Err(Error::ResolveDecimal(other.into())),
798        }
799    }
800
801    fn resolve_date(self) -> Result<Self, Error> {
802        match self {
803            Value::Date(d) | Value::Int(d) => Ok(Value::Date(d)),
804            other => Err(Error::GetDate(other.into())),
805        }
806    }
807
808    fn resolve_time_millis(self) -> Result<Self, Error> {
809        match self {
810            Value::TimeMillis(t) | Value::Int(t) => Ok(Value::TimeMillis(t)),
811            other => Err(Error::GetTimeMillis(other.into())),
812        }
813    }
814
815    fn resolve_time_micros(self) -> Result<Self, Error> {
816        match self {
817            Value::TimeMicros(t) | Value::Long(t) => Ok(Value::TimeMicros(t)),
818            Value::Int(t) => Ok(Value::TimeMicros(i64::from(t))),
819            other => Err(Error::GetTimeMicros(other.into())),
820        }
821    }
822
823    fn resolve_timestamp_millis(self) -> Result<Self, Error> {
824        match self {
825            Value::TimestampMillis(ts) | Value::Long(ts) => Ok(Value::TimestampMillis(ts)),
826            Value::Int(ts) => Ok(Value::TimestampMillis(i64::from(ts))),
827            other => Err(Error::GetTimestampMillis(other.into())),
828        }
829    }
830
831    fn resolve_timestamp_micros(self) -> Result<Self, Error> {
832        match self {
833            Value::TimestampMicros(ts) | Value::Long(ts) => Ok(Value::TimestampMicros(ts)),
834            Value::Int(ts) => Ok(Value::TimestampMicros(i64::from(ts))),
835            other => Err(Error::GetTimestampMicros(other.into())),
836        }
837    }
838
839    fn resolve_timestamp_nanos(self) -> Result<Self, Error> {
840        match self {
841            Value::TimestampNanos(ts) | Value::Long(ts) => Ok(Value::TimestampNanos(ts)),
842            Value::Int(ts) => Ok(Value::TimestampNanos(i64::from(ts))),
843            other => Err(Error::GetTimestampNanos(other.into())),
844        }
845    }
846
847    fn resolve_local_timestamp_millis(self) -> Result<Self, Error> {
848        match self {
849            Value::LocalTimestampMillis(ts) | Value::Long(ts) => {
850                Ok(Value::LocalTimestampMillis(ts))
851            }
852            Value::Int(ts) => Ok(Value::LocalTimestampMillis(i64::from(ts))),
853            other => Err(Error::GetLocalTimestampMillis(other.into())),
854        }
855    }
856
857    fn resolve_local_timestamp_micros(self) -> Result<Self, Error> {
858        match self {
859            Value::LocalTimestampMicros(ts) | Value::Long(ts) => {
860                Ok(Value::LocalTimestampMicros(ts))
861            }
862            Value::Int(ts) => Ok(Value::LocalTimestampMicros(i64::from(ts))),
863            other => Err(Error::GetLocalTimestampMicros(other.into())),
864        }
865    }
866
867    fn resolve_local_timestamp_nanos(self) -> Result<Self, Error> {
868        match self {
869            Value::LocalTimestampNanos(ts) | Value::Long(ts) => Ok(Value::LocalTimestampNanos(ts)),
870            Value::Int(ts) => Ok(Value::LocalTimestampNanos(i64::from(ts))),
871            other => Err(Error::GetLocalTimestampNanos(other.into())),
872        }
873    }
874
875    fn resolve_null(self) -> Result<Self, Error> {
876        match self {
877            Value::Null => Ok(Value::Null),
878            other => Err(Error::GetNull(other.into())),
879        }
880    }
881
882    fn resolve_boolean(self) -> Result<Self, Error> {
883        match self {
884            Value::Boolean(b) => Ok(Value::Boolean(b)),
885            other => Err(Error::GetBoolean(other.into())),
886        }
887    }
888
889    fn resolve_int(self) -> Result<Self, Error> {
890        match self {
891            Value::Int(n) => Ok(Value::Int(n)),
892            Value::Long(n) => Ok(Value::Int(n as i32)),
893            other => Err(Error::GetInt(other.into())),
894        }
895    }
896
897    fn resolve_long(self) -> Result<Self, Error> {
898        match self {
899            Value::Int(n) => Ok(Value::Long(i64::from(n))),
900            Value::Long(n) => Ok(Value::Long(n)),
901            other => Err(Error::GetLong(other.into())),
902        }
903    }
904
905    fn resolve_float(self) -> Result<Self, Error> {
906        match self {
907            Value::Int(n) => Ok(Value::Float(n as f32)),
908            Value::Long(n) => Ok(Value::Float(n as f32)),
909            Value::Float(x) => Ok(Value::Float(x)),
910            Value::Double(x) => Ok(Value::Float(x as f32)),
911            other => Err(Error::GetFloat(other.into())),
912        }
913    }
914
915    fn resolve_double(self) -> Result<Self, Error> {
916        match self {
917            Value::Int(n) => Ok(Value::Double(f64::from(n))),
918            Value::Long(n) => Ok(Value::Double(n as f64)),
919            Value::Float(x) => Ok(Value::Double(f64::from(x))),
920            Value::Double(x) => Ok(Value::Double(x)),
921            other => Err(Error::GetDouble(other.into())),
922        }
923    }
924
925    fn resolve_bytes(self) -> Result<Self, Error> {
926        match self {
927            Value::Bytes(bytes) => Ok(Value::Bytes(bytes)),
928            Value::String(s) => Ok(Value::Bytes(s.into_bytes())),
929            Value::Array(items) => Ok(Value::Bytes(
930                items
931                    .into_iter()
932                    .map(Value::try_u8)
933                    .collect::<Result<Vec<_>, _>>()?,
934            )),
935            other => Err(Error::GetBytes(other.into())),
936        }
937    }
938
939    fn resolve_string(self) -> Result<Self, Error> {
940        match self {
941            Value::String(s) => Ok(Value::String(s)),
942            Value::Bytes(bytes) | Value::Fixed(_, bytes) => Ok(Value::String(
943                String::from_utf8(bytes).map_err(Error::ConvertToUtf8)?,
944            )),
945            other => Err(Error::GetString(other.into())),
946        }
947    }
948
949    fn resolve_fixed(self, size: usize) -> Result<Self, Error> {
950        match self {
951            Value::Fixed(n, bytes) => {
952                if n == size {
953                    Ok(Value::Fixed(n, bytes))
954                } else {
955                    Err(Error::CompareFixedSizes { size, n })
956                }
957            }
958            Value::String(s) => Ok(Value::Fixed(s.len(), s.into_bytes())),
959            Value::Bytes(s) => {
960                if s.len() == size {
961                    Ok(Value::Fixed(size, s))
962                } else {
963                    Err(Error::CompareFixedSizes { size, n: s.len() })
964                }
965            }
966            other => Err(Error::GetStringForFixed(other.into())),
967        }
968    }
969
970    pub(crate) fn resolve_enum(
971        self,
972        symbols: &[String],
973        enum_default: &Option<String>,
974        _field_default: &Option<JsonValue>,
975    ) -> Result<Self, Error> {
976        let validate_symbol = |symbol: String, symbols: &[String]| {
977            if let Some(index) = symbols.iter().position(|item| item == &symbol) {
978                Ok(Value::Enum(index as u32, symbol))
979            } else {
980                match enum_default {
981                    Some(default) => {
982                        if let Some(index) = symbols.iter().position(|item| item == default) {
983                            Ok(Value::Enum(index as u32, default.clone()))
984                        } else {
985                            Err(Error::GetEnumDefault {
986                                symbol,
987                                symbols: symbols.into(),
988                            })
989                        }
990                    }
991                    _ => Err(Error::GetEnumDefault {
992                        symbol,
993                        symbols: symbols.into(),
994                    }),
995                }
996            }
997        };
998
999        match self {
1000            Value::Enum(_raw_index, s) => validate_symbol(s, symbols),
1001            Value::String(s) => validate_symbol(s, symbols),
1002            other => Err(Error::GetEnum(other.into())),
1003        }
1004    }
1005
1006    fn resolve_union<S: Borrow<Schema> + Debug>(
1007        self,
1008        schema: &UnionSchema,
1009        names: &HashMap<Name, S>,
1010        enclosing_namespace: &Namespace,
1011        field_default: &Option<JsonValue>,
1012    ) -> Result<Self, Error> {
1013        let v = match self {
1014            // Both are unions case.
1015            Value::Union(_i, v) => *v,
1016            // Reader is a union, but writer is not.
1017            v => v,
1018        };
1019        let (i, inner) = schema
1020            .find_schema_with_known_schemata(&v, Some(names), enclosing_namespace)
1021            .ok_or(Error::FindUnionVariant)?;
1022
1023        Ok(Value::Union(
1024            i as u32,
1025            Box::new(v.resolve_internal(inner, names, enclosing_namespace, field_default)?),
1026        ))
1027    }
1028
1029    fn resolve_array<S: Borrow<Schema> + Debug>(
1030        self,
1031        schema: &Schema,
1032        names: &HashMap<Name, S>,
1033        enclosing_namespace: &Namespace,
1034    ) -> Result<Self, Error> {
1035        match self {
1036            Value::Array(items) => Ok(Value::Array(
1037                items
1038                    .into_iter()
1039                    .map(|item| item.resolve_internal(schema, names, enclosing_namespace, &None))
1040                    .collect::<Result<_, _>>()?,
1041            )),
1042            other => Err(Error::GetArray {
1043                expected: schema.into(),
1044                other: other.into(),
1045            }),
1046        }
1047    }
1048
1049    fn resolve_map<S: Borrow<Schema> + Debug>(
1050        self,
1051        schema: &Schema,
1052        names: &HashMap<Name, S>,
1053        enclosing_namespace: &Namespace,
1054    ) -> Result<Self, Error> {
1055        match self {
1056            Value::Map(items) => Ok(Value::Map(
1057                items
1058                    .into_iter()
1059                    .map(|(key, value)| {
1060                        value
1061                            .resolve_internal(schema, names, enclosing_namespace, &None)
1062                            .map(|value| (key, value))
1063                    })
1064                    .collect::<Result<_, _>>()?,
1065            )),
1066            other => Err(Error::GetMap {
1067                expected: schema.into(),
1068                other: other.into(),
1069            }),
1070        }
1071    }
1072
1073    fn resolve_record<S: Borrow<Schema> + Debug>(
1074        self,
1075        fields: &[RecordField],
1076        names: &HashMap<Name, S>,
1077        enclosing_namespace: &Namespace,
1078    ) -> Result<Self, Error> {
1079        let mut items = match self {
1080            Value::Map(items) => Ok(items),
1081            Value::Record(fields) => Ok(fields.into_iter().collect::<HashMap<_, _>>()),
1082            other => Err(Error::GetRecord {
1083                expected: fields
1084                    .iter()
1085                    .map(|field| (field.name.clone(), field.schema.clone().into()))
1086                    .collect(),
1087                other: other.into(),
1088            }),
1089        }?;
1090
1091        let new_fields = fields
1092            .iter()
1093            .map(|field| {
1094                let value = match items.remove(&field.name) {
1095                    Some(value) => value,
1096                    None => match field.default {
1097                        Some(ref value) => match field.schema {
1098                            Schema::Enum(EnumSchema {
1099                                ref symbols,
1100                                ref default,
1101                                ..
1102                            }) => Value::from(value.clone()).resolve_enum(
1103                                symbols,
1104                                default,
1105                                &field.default.clone(),
1106                            )?,
1107                            Schema::Union(ref union_schema) => {
1108                                let first = &union_schema.variants()[0];
1109                                // NOTE: this match exists only to optimize null defaults for large
1110                                // backward-compatible schemas with many nullable fields
1111                                match first {
1112                                    Schema::Null => Value::Union(0, Box::new(Value::Null)),
1113                                    _ => Value::Union(
1114                                        0,
1115                                        Box::new(Value::from(value.clone()).resolve_internal(
1116                                            first,
1117                                            names,
1118                                            enclosing_namespace,
1119                                            &field.default,
1120                                        )?),
1121                                    ),
1122                                }
1123                            }
1124                            _ => Value::from(value.clone()),
1125                        },
1126                        None => {
1127                            return Err(Error::GetField(field.name.clone()));
1128                        }
1129                    },
1130                };
1131                value
1132                    .resolve_internal(&field.schema, names, enclosing_namespace, &field.default)
1133                    .map(|value| (field.name.clone(), value))
1134            })
1135            .collect::<Result<Vec<_>, _>>()?;
1136
1137        Ok(Value::Record(new_fields))
1138    }
1139
1140    fn try_u8(self) -> AvroResult<u8> {
1141        let int = self.resolve(&Schema::Int)?;
1142        if let Value::Int(n) = int {
1143            if n >= 0 && n <= i32::from(u8::MAX) {
1144                return Ok(n as u8);
1145            }
1146        }
1147
1148        Err(Error::GetU8(int.into()))
1149    }
1150}
1151
1152#[cfg(test)]
1153mod tests {
1154    use super::*;
1155    use crate::{
1156        duration::{Days, Millis, Months},
1157        schema::RecordFieldOrder,
1158    };
1159    use apache_avro_test_helper::{
1160        logger::{assert_logged, assert_not_logged},
1161        TestResult,
1162    };
1163    use num_bigint::BigInt;
1164    use pretty_assertions::assert_eq;
1165    use serde_json::json;
1166
1167    #[test]
1168    fn avro_3809_validate_nested_records_with_implicit_namespace() -> TestResult {
1169        let schema = Schema::parse_str(
1170            r#"{
1171            "name": "record_name",
1172            "namespace": "space",
1173            "type": "record",
1174            "fields": [
1175              {
1176                "name": "outer_field_1",
1177                "type": {
1178                  "type": "record",
1179                  "name": "middle_record_name",
1180                  "namespace": "middle_namespace",
1181                  "fields": [
1182                    {
1183                      "name": "middle_field_1",
1184                      "type": {
1185                        "type": "record",
1186                        "name": "inner_record_name",
1187                        "fields": [
1188                          { "name": "inner_field_1", "type": "double" }
1189                        ]
1190                      }
1191                    },
1192                    { "name": "middle_field_2", "type": "inner_record_name" }
1193                  ]
1194                }
1195              }
1196            ]
1197          }"#,
1198        )?;
1199        let value = Value::Record(vec![(
1200            "outer_field_1".into(),
1201            Value::Record(vec![
1202                (
1203                    "middle_field_1".into(),
1204                    Value::Record(vec![("inner_field_1".into(), Value::Double(1.2f64))]),
1205                ),
1206                (
1207                    "middle_field_2".into(),
1208                    Value::Record(vec![("inner_field_1".into(), Value::Double(1.6f64))]),
1209                ),
1210            ]),
1211        )]);
1212
1213        assert!(value.validate(&schema));
1214        Ok(())
1215    }
1216
1217    #[test]
1218    fn validate() -> TestResult {
1219        let value_schema_valid = vec![
1220            (Value::Int(42), Schema::Int, true, ""),
1221            (Value::Int(43), Schema::Long, true, ""),
1222            (Value::Float(43.2), Schema::Float, true, ""),
1223            (Value::Float(45.9), Schema::Double, true, ""),
1224            (
1225                Value::Int(42),
1226                Schema::Boolean,
1227                false,
1228                "Invalid value: Int(42) for schema: Boolean. Reason: Unsupported value-schema combination! Value: Int(42), schema: Boolean",
1229            ),
1230            (
1231                Value::Union(0, Box::new(Value::Null)),
1232                Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?),
1233                true,
1234                "",
1235            ),
1236            (
1237                Value::Union(1, Box::new(Value::Int(42))),
1238                Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?),
1239                true,
1240                "",
1241            ),
1242            (
1243                Value::Union(0, Box::new(Value::Null)),
1244                Schema::Union(UnionSchema::new(vec![Schema::Double, Schema::Int])?),
1245                false,
1246                "Invalid value: Union(0, Null) for schema: Union(UnionSchema { schemas: [Double, Int], variant_index: {Int: 1, Double: 0} }). Reason: Unsupported value-schema combination! Value: Null, schema: Double",
1247            ),
1248            (
1249                Value::Union(3, Box::new(Value::Int(42))),
1250                Schema::Union(
1251                    UnionSchema::new(vec![
1252                        Schema::Null,
1253                        Schema::Double,
1254                        Schema::String,
1255                        Schema::Int,
1256                    ])
1257                    ?,
1258                ),
1259                true,
1260                "",
1261            ),
1262            (
1263                Value::Union(1, Box::new(Value::Long(42i64))),
1264                Schema::Union(
1265                    UnionSchema::new(vec![Schema::Null, Schema::TimestampMillis])?,
1266                ),
1267                true,
1268                "",
1269            ),
1270            (
1271                Value::Union(2, Box::new(Value::Long(1_i64))),
1272                Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?),
1273                false,
1274                "Invalid value: Union(2, Long(1)) for schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }). Reason: No schema in the union at position '2'",
1275            ),
1276            (
1277                Value::Array(vec![Value::Long(42i64)]),
1278                Schema::array(Schema::Long),
1279                true,
1280                "",
1281            ),
1282            (
1283                Value::Array(vec![Value::Boolean(true)]),
1284                Schema::array(Schema::Long),
1285                false,
1286                "Invalid value: Array([Boolean(true)]) for schema: Array(ArraySchema { items: Long, attributes: {} }). Reason: Unsupported value-schema combination! Value: Boolean(true), schema: Long",
1287            ),
1288            (Value::Record(vec![]), Schema::Null, false, "Invalid value: Record([]) for schema: Null. Reason: Unsupported value-schema combination! Value: Record([]), schema: Null"),
1289            (
1290                Value::Fixed(12, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]),
1291                Schema::Duration,
1292                true,
1293                "",
1294            ),
1295            (
1296                Value::Fixed(11, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
1297                Schema::Duration,
1298                false,
1299                "Invalid value: Fixed(11, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) for schema: Duration. Reason: The value's size ('11') must be exactly 12 to be a Duration",
1300            ),
1301            (
1302                Value::Record(vec![("unknown_field_name".to_string(), Value::Null)]),
1303                Schema::Record(RecordSchema {
1304                    name: Name::new("record_name").unwrap(),
1305                    aliases: None,
1306                    doc: None,
1307                    fields: vec![RecordField {
1308                        name: "field_name".to_string(),
1309                        doc: None,
1310                        default: None,
1311                        aliases: None,
1312                        schema: Schema::Int,
1313                        order: RecordFieldOrder::Ignore,
1314                        position: 0,
1315                        custom_attributes: Default::default(),
1316                    }],
1317                    lookup: Default::default(),
1318                    attributes: Default::default(),
1319                }),
1320                false,
1321                r#"Invalid value: Record([("unknown_field_name", Null)]) for schema: Record(RecordSchema { name: Name { name: "record_name", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "field_name", doc: None, aliases: None, default: None, schema: Int, order: Ignore, position: 0, custom_attributes: {} }], lookup: {}, attributes: {} }). Reason: There is no schema field for field 'unknown_field_name'"#,
1322            ),
1323            (
1324                Value::Record(vec![("field_name".to_string(), Value::Null)]),
1325                Schema::Record(RecordSchema {
1326                    name: Name::new("record_name").unwrap(),
1327                    aliases: None,
1328                    doc: None,
1329                    fields: vec![RecordField {
1330                        name: "field_name".to_string(),
1331                        doc: None,
1332                        default: None,
1333                        aliases: None,
1334                        schema: Schema::Ref {
1335                            name: Name::new("missing").unwrap(),
1336                        },
1337                        order: RecordFieldOrder::Ignore,
1338                        position: 0,
1339                        custom_attributes: Default::default(),
1340                    }],
1341                    lookup: [("field_name".to_string(), 0)].iter().cloned().collect(),
1342                    attributes: Default::default(),
1343                }),
1344                false,
1345                r#"Invalid value: Record([("field_name", Null)]) for schema: Record(RecordSchema { name: Name { name: "record_name", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "field_name", doc: None, aliases: None, default: None, schema: Ref { name: Name { name: "missing", namespace: None } }, order: Ignore, position: 0, custom_attributes: {} }], lookup: {"field_name": 0}, attributes: {} }). Reason: Unresolved schema reference: 'Name { name: "missing", namespace: None }'. Parsed names: []"#,
1346            ),
1347        ];
1348
1349        for (value, schema, valid, expected_err_message) in value_schema_valid.into_iter() {
1350            let err_message =
1351                value.validate_internal::<Schema>(&schema, &HashMap::default(), &None);
1352            assert_eq!(valid, err_message.is_none());
1353            if !valid {
1354                let full_err_message = format!(
1355                    "Invalid value: {:?} for schema: {:?}. Reason: {}",
1356                    value,
1357                    schema,
1358                    err_message.unwrap()
1359                );
1360                assert_eq!(expected_err_message, full_err_message);
1361            }
1362        }
1363
1364        Ok(())
1365    }
1366
1367    #[test]
1368    fn validate_fixed() -> TestResult {
1369        let schema = Schema::Fixed(FixedSchema {
1370            size: 4,
1371            name: Name::new("some_fixed").unwrap(),
1372            aliases: None,
1373            doc: None,
1374            default: None,
1375            attributes: Default::default(),
1376        });
1377
1378        assert!(Value::Fixed(4, vec![0, 0, 0, 0]).validate(&schema));
1379        let value = Value::Fixed(5, vec![0, 0, 0, 0, 0]);
1380        assert!(!value.validate(&schema));
1381        assert_logged(
1382            format!(
1383                "Invalid value: {:?} for schema: {:?}. Reason: {}",
1384                value, schema, "The value's size (5) is different than the schema's size (4)"
1385            )
1386            .as_str(),
1387        );
1388
1389        assert!(Value::Bytes(vec![0, 0, 0, 0]).validate(&schema));
1390        let value = Value::Bytes(vec![0, 0, 0, 0, 0]);
1391        assert!(!value.validate(&schema));
1392        assert_logged(
1393            format!(
1394                "Invalid value: {:?} for schema: {:?}. Reason: {}",
1395                value, schema, "The bytes' length (5) is different than the schema's size (4)"
1396            )
1397            .as_str(),
1398        );
1399
1400        Ok(())
1401    }
1402
1403    #[test]
1404    fn validate_enum() -> TestResult {
1405        let schema = Schema::Enum(EnumSchema {
1406            name: Name::new("some_enum").unwrap(),
1407            aliases: None,
1408            doc: None,
1409            symbols: vec![
1410                "spades".to_string(),
1411                "hearts".to_string(),
1412                "diamonds".to_string(),
1413                "clubs".to_string(),
1414            ],
1415            default: None,
1416            attributes: Default::default(),
1417        });
1418
1419        assert!(Value::Enum(0, "spades".to_string()).validate(&schema));
1420        assert!(Value::String("spades".to_string()).validate(&schema));
1421
1422        let value = Value::Enum(1, "spades".to_string());
1423        assert!(!value.validate(&schema));
1424        assert_logged(
1425            format!(
1426                "Invalid value: {:?} for schema: {:?}. Reason: {}",
1427                value, schema, "Symbol 'spades' is not at position '1'"
1428            )
1429            .as_str(),
1430        );
1431
1432        let value = Value::Enum(1000, "spades".to_string());
1433        assert!(!value.validate(&schema));
1434        assert_logged(
1435            format!(
1436                "Invalid value: {:?} for schema: {:?}. Reason: {}",
1437                value, schema, "No symbol at position '1000'"
1438            )
1439            .as_str(),
1440        );
1441
1442        let value = Value::String("lorem".to_string());
1443        assert!(!value.validate(&schema));
1444        assert_logged(
1445            format!(
1446                "Invalid value: {:?} for schema: {:?}. Reason: {}",
1447                value, schema, "'lorem' is not a member of the possible symbols"
1448            )
1449            .as_str(),
1450        );
1451
1452        let other_schema = Schema::Enum(EnumSchema {
1453            name: Name::new("some_other_enum").unwrap(),
1454            aliases: None,
1455            doc: None,
1456            symbols: vec![
1457                "hearts".to_string(),
1458                "diamonds".to_string(),
1459                "clubs".to_string(),
1460                "spades".to_string(),
1461            ],
1462            default: None,
1463            attributes: Default::default(),
1464        });
1465
1466        let value = Value::Enum(0, "spades".to_string());
1467        assert!(!value.validate(&other_schema));
1468        assert_logged(
1469            format!(
1470                "Invalid value: {:?} for schema: {:?}. Reason: {}",
1471                value, other_schema, "Symbol 'spades' is not at position '0'"
1472            )
1473            .as_str(),
1474        );
1475
1476        Ok(())
1477    }
1478
1479    #[test]
1480    fn validate_record() -> TestResult {
1481        // {
1482        //    "type": "record",
1483        //    "fields": [
1484        //      {"type": "long", "name": "a"},
1485        //      {"type": "string", "name": "b"},
1486        //      {
1487        //          "type": ["null", "int"]
1488        //          "name": "c",
1489        //          "default": null
1490        //      }
1491        //    ]
1492        // }
1493        let schema = Schema::Record(RecordSchema {
1494            name: Name::new("some_record").unwrap(),
1495            aliases: None,
1496            doc: None,
1497            fields: vec![
1498                RecordField {
1499                    name: "a".to_string(),
1500                    doc: None,
1501                    default: None,
1502                    aliases: None,
1503                    schema: Schema::Long,
1504                    order: RecordFieldOrder::Ascending,
1505                    position: 0,
1506                    custom_attributes: Default::default(),
1507                },
1508                RecordField {
1509                    name: "b".to_string(),
1510                    doc: None,
1511                    default: None,
1512                    aliases: None,
1513                    schema: Schema::String,
1514                    order: RecordFieldOrder::Ascending,
1515                    position: 1,
1516                    custom_attributes: Default::default(),
1517                },
1518                RecordField {
1519                    name: "c".to_string(),
1520                    doc: None,
1521                    default: Some(JsonValue::Null),
1522                    aliases: None,
1523                    schema: Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?),
1524                    order: RecordFieldOrder::Ascending,
1525                    position: 2,
1526                    custom_attributes: Default::default(),
1527                },
1528            ],
1529            lookup: [
1530                ("a".to_string(), 0),
1531                ("b".to_string(), 1),
1532                ("c".to_string(), 2),
1533            ]
1534            .iter()
1535            .cloned()
1536            .collect(),
1537            attributes: Default::default(),
1538        });
1539
1540        assert!(Value::Record(vec![
1541            ("a".to_string(), Value::Long(42i64)),
1542            ("b".to_string(), Value::String("foo".to_string())),
1543        ])
1544        .validate(&schema));
1545
1546        let value = Value::Record(vec![
1547            ("b".to_string(), Value::String("foo".to_string())),
1548            ("a".to_string(), Value::Long(42i64)),
1549        ]);
1550        assert!(value.validate(&schema));
1551
1552        let value = Value::Record(vec![
1553            ("a".to_string(), Value::Boolean(false)),
1554            ("b".to_string(), Value::String("foo".to_string())),
1555        ]);
1556        assert!(!value.validate(&schema));
1557        assert_logged(
1558            r#"Invalid value: Record([("a", Boolean(false)), ("b", String("foo"))]) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: Unsupported value-schema combination! Value: Boolean(false), schema: Long"#,
1559        );
1560
1561        let value = Value::Record(vec![
1562            ("a".to_string(), Value::Long(42i64)),
1563            ("c".to_string(), Value::String("foo".to_string())),
1564        ]);
1565        assert!(!value.validate(&schema));
1566        assert_logged(
1567            r#"Invalid value: Record([("a", Long(42)), ("c", String("foo"))]) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: Could not find matching type in union"#,
1568        );
1569        assert_not_logged(
1570            r#"Invalid value: String("foo") for schema: Int. Reason: Unsupported value-schema combination"#,
1571        );
1572
1573        let value = Value::Record(vec![
1574            ("a".to_string(), Value::Long(42i64)),
1575            ("d".to_string(), Value::String("foo".to_string())),
1576        ]);
1577        assert!(!value.validate(&schema));
1578        assert_logged(
1579            r#"Invalid value: Record([("a", Long(42)), ("d", String("foo"))]) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: There is no schema field for field 'd'"#,
1580        );
1581
1582        let value = Value::Record(vec![
1583            ("a".to_string(), Value::Long(42i64)),
1584            ("b".to_string(), Value::String("foo".to_string())),
1585            ("c".to_string(), Value::Null),
1586            ("d".to_string(), Value::Null),
1587        ]);
1588        assert!(!value.validate(&schema));
1589        assert_logged(
1590            r#"Invalid value: Record([("a", Long(42)), ("b", String("foo")), ("c", Null), ("d", Null)]) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: The value's records length (4) is greater than the schema's (3 fields)"#,
1591        );
1592
1593        assert!(Value::Map(
1594            vec![
1595                ("a".to_string(), Value::Long(42i64)),
1596                ("b".to_string(), Value::String("foo".to_string())),
1597            ]
1598            .into_iter()
1599            .collect()
1600        )
1601        .validate(&schema));
1602
1603        assert!(!Value::Map(
1604            vec![("d".to_string(), Value::Long(123_i64)),]
1605                .into_iter()
1606                .collect()
1607        )
1608        .validate(&schema));
1609        assert_logged(
1610            r#"Invalid value: Map({"d": Long(123)}) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: Field with name '"a"' is not a member of the map items
1611Field with name '"b"' is not a member of the map items"#,
1612        );
1613
1614        let union_schema = Schema::Union(UnionSchema::new(vec![Schema::Null, schema])?);
1615
1616        assert!(Value::Union(
1617            1,
1618            Box::new(Value::Record(vec![
1619                ("a".to_string(), Value::Long(42i64)),
1620                ("b".to_string(), Value::String("foo".to_string())),
1621            ]))
1622        )
1623        .validate(&union_schema));
1624
1625        assert!(Value::Union(
1626            1,
1627            Box::new(Value::Map(
1628                vec![
1629                    ("a".to_string(), Value::Long(42i64)),
1630                    ("b".to_string(), Value::String("foo".to_string())),
1631                ]
1632                .into_iter()
1633                .collect()
1634            ))
1635        )
1636        .validate(&union_schema));
1637
1638        Ok(())
1639    }
1640
1641    #[test]
1642    fn resolve_bytes_ok() -> TestResult {
1643        let value = Value::Array(vec![Value::Int(0), Value::Int(42)]);
1644        assert_eq!(
1645            value.resolve(&Schema::Bytes)?,
1646            Value::Bytes(vec![0u8, 42u8])
1647        );
1648
1649        Ok(())
1650    }
1651
1652    #[test]
1653    fn resolve_string_from_bytes() -> TestResult {
1654        let value = Value::Bytes(vec![97, 98, 99]);
1655        assert_eq!(
1656            value.resolve(&Schema::String)?,
1657            Value::String("abc".to_string())
1658        );
1659
1660        Ok(())
1661    }
1662
1663    #[test]
1664    fn resolve_string_from_fixed() -> TestResult {
1665        let value = Value::Fixed(3, vec![97, 98, 99]);
1666        assert_eq!(
1667            value.resolve(&Schema::String)?,
1668            Value::String("abc".to_string())
1669        );
1670
1671        Ok(())
1672    }
1673
1674    #[test]
1675    fn resolve_bytes_failure() {
1676        let value = Value::Array(vec![Value::Int(2000), Value::Int(-42)]);
1677        assert!(value.resolve(&Schema::Bytes).is_err());
1678    }
1679
1680    #[test]
1681    fn resolve_decimal_bytes() -> TestResult {
1682        let value = Value::Decimal(Decimal::from(vec![1, 2, 3, 4, 5]));
1683        value.clone().resolve(&Schema::Decimal(DecimalSchema {
1684            precision: 10,
1685            scale: 4,
1686            inner: Box::new(Schema::Bytes),
1687        }))?;
1688        assert!(value.resolve(&Schema::String).is_err());
1689
1690        Ok(())
1691    }
1692
1693    #[test]
1694    fn resolve_decimal_invalid_scale() {
1695        let value = Value::Decimal(Decimal::from(vec![1, 2]));
1696        assert!(value
1697            .resolve(&Schema::Decimal(DecimalSchema {
1698                precision: 2,
1699                scale: 3,
1700                inner: Box::new(Schema::Bytes),
1701            }))
1702            .is_err());
1703    }
1704
1705    #[test]
1706    fn resolve_decimal_invalid_precision_for_length() {
1707        let value = Value::Decimal(Decimal::from((1u8..=8u8).rev().collect::<Vec<_>>()));
1708        assert!(value
1709            .resolve(&Schema::Decimal(DecimalSchema {
1710                precision: 1,
1711                scale: 0,
1712                inner: Box::new(Schema::Bytes),
1713            }))
1714            .is_ok());
1715    }
1716
1717    #[test]
1718    fn resolve_decimal_fixed() {
1719        let value = Value::Decimal(Decimal::from(vec![1, 2, 3, 4, 5]));
1720        assert!(value
1721            .clone()
1722            .resolve(&Schema::Decimal(DecimalSchema {
1723                precision: 10,
1724                scale: 1,
1725                inner: Box::new(Schema::Fixed(FixedSchema {
1726                    name: Name::new("decimal").unwrap(),
1727                    aliases: None,
1728                    size: 20,
1729                    doc: None,
1730                    default: None,
1731                    attributes: Default::default(),
1732                }))
1733            }))
1734            .is_ok());
1735        assert!(value.resolve(&Schema::String).is_err());
1736    }
1737
1738    #[test]
1739    fn resolve_date() {
1740        let value = Value::Date(2345);
1741        assert!(value.clone().resolve(&Schema::Date).is_ok());
1742        assert!(value.resolve(&Schema::String).is_err());
1743    }
1744
1745    #[test]
1746    fn resolve_time_millis() {
1747        let value = Value::TimeMillis(10);
1748        assert!(value.clone().resolve(&Schema::TimeMillis).is_ok());
1749        assert!(value.resolve(&Schema::TimeMicros).is_err());
1750    }
1751
1752    #[test]
1753    fn resolve_time_micros() {
1754        let value = Value::TimeMicros(10);
1755        assert!(value.clone().resolve(&Schema::TimeMicros).is_ok());
1756        assert!(value.resolve(&Schema::TimeMillis).is_err());
1757    }
1758
1759    #[test]
1760    fn resolve_timestamp_millis() {
1761        let value = Value::TimestampMillis(10);
1762        assert!(value.clone().resolve(&Schema::TimestampMillis).is_ok());
1763        assert!(value.resolve(&Schema::Float).is_err());
1764
1765        let value = Value::Float(10.0f32);
1766        assert!(value.resolve(&Schema::TimestampMillis).is_err());
1767    }
1768
1769    #[test]
1770    fn resolve_timestamp_micros() {
1771        let value = Value::TimestampMicros(10);
1772        assert!(value.clone().resolve(&Schema::TimestampMicros).is_ok());
1773        assert!(value.resolve(&Schema::Int).is_err());
1774
1775        let value = Value::Double(10.0);
1776        assert!(value.resolve(&Schema::TimestampMicros).is_err());
1777    }
1778
1779    #[test]
1780    fn test_avro_3914_resolve_timestamp_nanos() {
1781        let value = Value::TimestampNanos(10);
1782        assert!(value.clone().resolve(&Schema::TimestampNanos).is_ok());
1783        assert!(value.resolve(&Schema::Int).is_err());
1784
1785        let value = Value::Double(10.0);
1786        assert!(value.resolve(&Schema::TimestampNanos).is_err());
1787    }
1788
1789    #[test]
1790    fn test_avro_3853_resolve_timestamp_millis() {
1791        let value = Value::LocalTimestampMillis(10);
1792        assert!(value.clone().resolve(&Schema::LocalTimestampMillis).is_ok());
1793        assert!(value.resolve(&Schema::Float).is_err());
1794
1795        let value = Value::Float(10.0f32);
1796        assert!(value.resolve(&Schema::LocalTimestampMillis).is_err());
1797    }
1798
1799    #[test]
1800    fn test_avro_3853_resolve_timestamp_micros() {
1801        let value = Value::LocalTimestampMicros(10);
1802        assert!(value.clone().resolve(&Schema::LocalTimestampMicros).is_ok());
1803        assert!(value.resolve(&Schema::Int).is_err());
1804
1805        let value = Value::Double(10.0);
1806        assert!(value.resolve(&Schema::LocalTimestampMicros).is_err());
1807    }
1808
1809    #[test]
1810    fn test_avro_3916_resolve_timestamp_nanos() {
1811        let value = Value::LocalTimestampNanos(10);
1812        assert!(value.clone().resolve(&Schema::LocalTimestampNanos).is_ok());
1813        assert!(value.resolve(&Schema::Int).is_err());
1814
1815        let value = Value::Double(10.0);
1816        assert!(value.resolve(&Schema::LocalTimestampNanos).is_err());
1817    }
1818
1819    #[test]
1820    fn resolve_duration() {
1821        let value = Value::Duration(Duration::new(
1822            Months::new(10),
1823            Days::new(5),
1824            Millis::new(3000),
1825        ));
1826        assert!(value.clone().resolve(&Schema::Duration).is_ok());
1827        assert!(value.resolve(&Schema::TimestampMicros).is_err());
1828        assert!(Value::Long(1i64).resolve(&Schema::Duration).is_err());
1829    }
1830
1831    #[test]
1832    fn resolve_uuid() -> TestResult {
1833        let value = Value::Uuid(Uuid::parse_str("1481531d-ccc9-46d9-a56f-5b67459c0537")?);
1834        assert!(value.clone().resolve(&Schema::Uuid).is_ok());
1835        assert!(value.resolve(&Schema::TimestampMicros).is_err());
1836
1837        Ok(())
1838    }
1839
1840    #[test]
1841    fn avro_3678_resolve_float_to_double() {
1842        let value = Value::Float(2345.1);
1843        assert!(value.resolve(&Schema::Double).is_ok());
1844    }
1845
1846    #[test]
1847    fn test_avro_3621_resolve_to_nullable_union() -> TestResult {
1848        let schema = Schema::parse_str(
1849            r#"{
1850            "type": "record",
1851            "name": "root",
1852            "fields": [
1853                {
1854                    "name": "event",
1855                    "type": [
1856                        "null",
1857                        {
1858                            "type": "record",
1859                            "name": "event",
1860                            "fields": [
1861                                {
1862                                    "name": "amount",
1863                                    "type": "int"
1864                                },
1865                                {
1866                                    "name": "size",
1867                                    "type": [
1868                                        "null",
1869                                        "int"
1870                                    ],
1871                                    "default": null
1872                                }
1873                            ]
1874                        }
1875                    ],
1876                    "default": null
1877                }
1878            ]
1879        }"#,
1880        )?;
1881
1882        let value = Value::Record(vec![(
1883            "event".to_string(),
1884            Value::Record(vec![("amount".to_string(), Value::Int(200))]),
1885        )]);
1886        assert!(value.resolve(&schema).is_ok());
1887
1888        let value = Value::Record(vec![(
1889            "event".to_string(),
1890            Value::Record(vec![("size".to_string(), Value::Int(1))]),
1891        )]);
1892        assert!(value.resolve(&schema).is_err());
1893
1894        Ok(())
1895    }
1896
1897    #[test]
1898    fn json_from_avro() -> TestResult {
1899        assert_eq!(JsonValue::try_from(Value::Null)?, JsonValue::Null);
1900        assert_eq!(
1901            JsonValue::try_from(Value::Boolean(true))?,
1902            JsonValue::Bool(true)
1903        );
1904        assert_eq!(
1905            JsonValue::try_from(Value::Int(1))?,
1906            JsonValue::Number(1.into())
1907        );
1908        assert_eq!(
1909            JsonValue::try_from(Value::Long(1))?,
1910            JsonValue::Number(1.into())
1911        );
1912        assert_eq!(
1913            JsonValue::try_from(Value::Float(1.0))?,
1914            JsonValue::Number(Number::from_f64(1.0).unwrap())
1915        );
1916        assert_eq!(
1917            JsonValue::try_from(Value::Double(1.0))?,
1918            JsonValue::Number(Number::from_f64(1.0).unwrap())
1919        );
1920        assert_eq!(
1921            JsonValue::try_from(Value::Bytes(vec![1, 2, 3]))?,
1922            JsonValue::Array(vec![
1923                JsonValue::Number(1.into()),
1924                JsonValue::Number(2.into()),
1925                JsonValue::Number(3.into())
1926            ])
1927        );
1928        assert_eq!(
1929            JsonValue::try_from(Value::String("test".into()))?,
1930            JsonValue::String("test".into())
1931        );
1932        assert_eq!(
1933            JsonValue::try_from(Value::Fixed(3, vec![1, 2, 3]))?,
1934            JsonValue::Array(vec![
1935                JsonValue::Number(1.into()),
1936                JsonValue::Number(2.into()),
1937                JsonValue::Number(3.into())
1938            ])
1939        );
1940        assert_eq!(
1941            JsonValue::try_from(Value::Enum(1, "test_enum".into()))?,
1942            JsonValue::String("test_enum".into())
1943        );
1944        assert_eq!(
1945            JsonValue::try_from(Value::Union(1, Box::new(Value::String("test_enum".into()))))?,
1946            JsonValue::String("test_enum".into())
1947        );
1948        assert_eq!(
1949            JsonValue::try_from(Value::Array(vec![
1950                Value::Int(1),
1951                Value::Int(2),
1952                Value::Int(3)
1953            ]))?,
1954            JsonValue::Array(vec![
1955                JsonValue::Number(1.into()),
1956                JsonValue::Number(2.into()),
1957                JsonValue::Number(3.into())
1958            ])
1959        );
1960        assert_eq!(
1961            JsonValue::try_from(Value::Map(
1962                vec![
1963                    ("v1".to_string(), Value::Int(1)),
1964                    ("v2".to_string(), Value::Int(2)),
1965                    ("v3".to_string(), Value::Int(3))
1966                ]
1967                .into_iter()
1968                .collect()
1969            ))?,
1970            JsonValue::Object(
1971                vec![
1972                    ("v1".to_string(), JsonValue::Number(1.into())),
1973                    ("v2".to_string(), JsonValue::Number(2.into())),
1974                    ("v3".to_string(), JsonValue::Number(3.into()))
1975                ]
1976                .into_iter()
1977                .collect()
1978            )
1979        );
1980        assert_eq!(
1981            JsonValue::try_from(Value::Record(vec![
1982                ("v1".to_string(), Value::Int(1)),
1983                ("v2".to_string(), Value::Int(2)),
1984                ("v3".to_string(), Value::Int(3))
1985            ]))?,
1986            JsonValue::Object(
1987                vec![
1988                    ("v1".to_string(), JsonValue::Number(1.into())),
1989                    ("v2".to_string(), JsonValue::Number(2.into())),
1990                    ("v3".to_string(), JsonValue::Number(3.into()))
1991                ]
1992                .into_iter()
1993                .collect()
1994            )
1995        );
1996        assert_eq!(
1997            JsonValue::try_from(Value::Date(1))?,
1998            JsonValue::Number(1.into())
1999        );
2000        assert_eq!(
2001            JsonValue::try_from(Value::Decimal(vec![1, 2, 3].into()))?,
2002            JsonValue::Array(vec![
2003                JsonValue::Number(1.into()),
2004                JsonValue::Number(2.into()),
2005                JsonValue::Number(3.into())
2006            ])
2007        );
2008        assert_eq!(
2009            JsonValue::try_from(Value::TimeMillis(1))?,
2010            JsonValue::Number(1.into())
2011        );
2012        assert_eq!(
2013            JsonValue::try_from(Value::TimeMicros(1))?,
2014            JsonValue::Number(1.into())
2015        );
2016        assert_eq!(
2017            JsonValue::try_from(Value::TimestampMillis(1))?,
2018            JsonValue::Number(1.into())
2019        );
2020        assert_eq!(
2021            JsonValue::try_from(Value::TimestampMicros(1))?,
2022            JsonValue::Number(1.into())
2023        );
2024        assert_eq!(
2025            JsonValue::try_from(Value::TimestampNanos(1))?,
2026            JsonValue::Number(1.into())
2027        );
2028        assert_eq!(
2029            JsonValue::try_from(Value::LocalTimestampMillis(1))?,
2030            JsonValue::Number(1.into())
2031        );
2032        assert_eq!(
2033            JsonValue::try_from(Value::LocalTimestampMicros(1))?,
2034            JsonValue::Number(1.into())
2035        );
2036        assert_eq!(
2037            JsonValue::try_from(Value::LocalTimestampNanos(1))?,
2038            JsonValue::Number(1.into())
2039        );
2040        assert_eq!(
2041            JsonValue::try_from(Value::Duration(
2042                [1u8, 2u8, 3u8, 4u8, 5u8, 6u8, 7u8, 8u8, 9u8, 10u8, 11u8, 12u8].into()
2043            ))?,
2044            JsonValue::Array(vec![
2045                JsonValue::Number(1.into()),
2046                JsonValue::Number(2.into()),
2047                JsonValue::Number(3.into()),
2048                JsonValue::Number(4.into()),
2049                JsonValue::Number(5.into()),
2050                JsonValue::Number(6.into()),
2051                JsonValue::Number(7.into()),
2052                JsonValue::Number(8.into()),
2053                JsonValue::Number(9.into()),
2054                JsonValue::Number(10.into()),
2055                JsonValue::Number(11.into()),
2056                JsonValue::Number(12.into()),
2057            ])
2058        );
2059        assert_eq!(
2060            JsonValue::try_from(Value::Uuid(Uuid::parse_str(
2061                "936DA01F-9ABD-4D9D-80C7-02AF85C822A8"
2062            )?))?,
2063            JsonValue::String("936da01f-9abd-4d9d-80c7-02af85c822a8".into())
2064        );
2065
2066        Ok(())
2067    }
2068
2069    #[test]
2070    fn test_avro_3433_recursive_resolves_record() -> TestResult {
2071        let schema = Schema::parse_str(
2072            r#"
2073        {
2074            "type":"record",
2075            "name":"TestStruct",
2076            "fields": [
2077                {
2078                    "name":"a",
2079                    "type":{
2080                        "type":"record",
2081                        "name": "Inner",
2082                        "fields": [ {
2083                            "name":"z",
2084                            "type":"int"
2085                        }]
2086                    }
2087                },
2088                {
2089                    "name":"b",
2090                    "type":"Inner"
2091                }
2092            ]
2093        }"#,
2094        )?;
2095
2096        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2097        let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2098        let outer = Value::Record(vec![("a".into(), inner_value1), ("b".into(), inner_value2)]);
2099        outer
2100            .resolve(&schema)
2101            .expect("Record definition defined in one field must be available in other field");
2102
2103        Ok(())
2104    }
2105
2106    #[test]
2107    fn test_avro_3433_recursive_resolves_array() -> TestResult {
2108        let schema = Schema::parse_str(
2109            r#"
2110        {
2111            "type":"record",
2112            "name":"TestStruct",
2113            "fields": [
2114                {
2115                    "name":"a",
2116                    "type":{
2117                        "type":"array",
2118                        "items": {
2119                            "type":"record",
2120                            "name": "Inner",
2121                            "fields": [ {
2122                                "name":"z",
2123                                "type":"int"
2124                            }]
2125                        }
2126                    }
2127                },
2128                {
2129                    "name":"b",
2130                    "type": {
2131                        "type":"map",
2132                        "values":"Inner"
2133                    }
2134                }
2135            ]
2136        }"#,
2137        )?;
2138
2139        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2140        let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2141        let outer_value = Value::Record(vec![
2142            ("a".into(), Value::Array(vec![inner_value1])),
2143            (
2144                "b".into(),
2145                Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
2146            ),
2147        ]);
2148        outer_value
2149            .resolve(&schema)
2150            .expect("Record defined in array definition must be resolvable from map");
2151
2152        Ok(())
2153    }
2154
2155    #[test]
2156    fn test_avro_3433_recursive_resolves_map() -> TestResult {
2157        let schema = Schema::parse_str(
2158            r#"
2159        {
2160            "type":"record",
2161            "name":"TestStruct",
2162            "fields": [
2163                {
2164                    "name":"a",
2165                    "type":{
2166                        "type":"record",
2167                        "name": "Inner",
2168                        "fields": [ {
2169                            "name":"z",
2170                            "type":"int"
2171                        }]
2172                    }
2173                },
2174                {
2175                    "name":"b",
2176                    "type": {
2177                        "type":"map",
2178                        "values":"Inner"
2179                    }
2180                }
2181            ]
2182        }"#,
2183        )?;
2184
2185        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2186        let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2187        let outer_value = Value::Record(vec![
2188            ("a".into(), inner_value1),
2189            (
2190                "b".into(),
2191                Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
2192            ),
2193        ]);
2194        outer_value
2195            .resolve(&schema)
2196            .expect("Record defined in record field must be resolvable from map field");
2197
2198        Ok(())
2199    }
2200
2201    #[test]
2202    fn test_avro_3433_recursive_resolves_record_wrapper() -> TestResult {
2203        let schema = Schema::parse_str(
2204            r#"
2205        {
2206            "type":"record",
2207            "name":"TestStruct",
2208            "fields": [
2209                {
2210                    "name":"a",
2211                    "type":{
2212                        "type":"record",
2213                        "name": "Inner",
2214                        "fields": [ {
2215                            "name":"z",
2216                            "type":"int"
2217                        }]
2218                    }
2219                },
2220                {
2221                    "name":"b",
2222                    "type": {
2223                        "type":"record",
2224                        "name": "InnerWrapper",
2225                        "fields": [ {
2226                            "name":"j",
2227                            "type":"Inner"
2228                        }]
2229                    }
2230                }
2231            ]
2232        }"#,
2233        )?;
2234
2235        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2236        let inner_value2 = Value::Record(vec![(
2237            "j".into(),
2238            Value::Record(vec![("z".into(), Value::Int(6))]),
2239        )]);
2240        let outer_value =
2241            Value::Record(vec![("a".into(), inner_value1), ("b".into(), inner_value2)]);
2242        outer_value.resolve(&schema).expect("Record schema defined in field must be resolvable in Record schema defined in other field");
2243
2244        Ok(())
2245    }
2246
2247    #[test]
2248    fn test_avro_3433_recursive_resolves_map_and_array() -> TestResult {
2249        let schema = Schema::parse_str(
2250            r#"
2251        {
2252            "type":"record",
2253            "name":"TestStruct",
2254            "fields": [
2255                {
2256                    "name":"a",
2257                    "type":{
2258                        "type":"map",
2259                        "values": {
2260                            "type":"record",
2261                            "name": "Inner",
2262                            "fields": [ {
2263                                "name":"z",
2264                                "type":"int"
2265                            }]
2266                        }
2267                    }
2268                },
2269                {
2270                    "name":"b",
2271                    "type": {
2272                        "type":"array",
2273                        "items":"Inner"
2274                    }
2275                }
2276            ]
2277        }"#,
2278        )?;
2279
2280        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2281        let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2282        let outer_value = Value::Record(vec![
2283            (
2284                "a".into(),
2285                Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
2286            ),
2287            ("b".into(), Value::Array(vec![inner_value1])),
2288        ]);
2289        outer_value
2290            .resolve(&schema)
2291            .expect("Record defined in map definition must be resolvable from array");
2292
2293        Ok(())
2294    }
2295
2296    #[test]
2297    fn test_avro_3433_recursive_resolves_union() -> TestResult {
2298        let schema = Schema::parse_str(
2299            r#"
2300        {
2301            "type":"record",
2302            "name":"TestStruct",
2303            "fields": [
2304                {
2305                    "name":"a",
2306                    "type":["null", {
2307                        "type":"record",
2308                        "name": "Inner",
2309                        "fields": [ {
2310                            "name":"z",
2311                            "type":"int"
2312                        }]
2313                    }]
2314                },
2315                {
2316                    "name":"b",
2317                    "type":"Inner"
2318                }
2319            ]
2320        }"#,
2321        )?;
2322
2323        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2324        let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2325        let outer1 = Value::Record(vec![
2326            ("a".into(), inner_value1),
2327            ("b".into(), inner_value2.clone()),
2328        ]);
2329        outer1
2330            .resolve(&schema)
2331            .expect("Record definition defined in union must be resolved in other field");
2332        let outer2 = Value::Record(vec![("a".into(), Value::Null), ("b".into(), inner_value2)]);
2333        outer2
2334            .resolve(&schema)
2335            .expect("Record definition defined in union must be resolved in other field");
2336
2337        Ok(())
2338    }
2339
2340    #[test]
2341    fn test_avro_3461_test_multi_level_resolve_outer_namespace() -> TestResult {
2342        let schema = r#"
2343        {
2344          "name": "record_name",
2345          "namespace": "space",
2346          "type": "record",
2347          "fields": [
2348            {
2349              "name": "outer_field_1",
2350              "type": [
2351                        "null",
2352                        {
2353                            "type": "record",
2354                            "name": "middle_record_name",
2355                            "fields":[
2356                                {
2357                                    "name":"middle_field_1",
2358                                    "type":[
2359                                        "null",
2360                                        {
2361                                            "type":"record",
2362                                            "name":"inner_record_name",
2363                                            "fields":[
2364                                                {
2365                                                    "name":"inner_field_1",
2366                                                    "type":"double"
2367                                                }
2368                                            ]
2369                                        }
2370                                    ]
2371                                }
2372                            ]
2373                        }
2374                    ]
2375            },
2376            {
2377                "name": "outer_field_2",
2378                "type" : "space.inner_record_name"
2379            }
2380          ]
2381        }
2382        "#;
2383        let schema = Schema::parse_str(schema)?;
2384        let inner_record = Value::Record(vec![("inner_field_1".into(), Value::Double(5.4))]);
2385        let middle_record_variation_1 = Value::Record(vec![(
2386            "middle_field_1".into(),
2387            Value::Union(0, Box::new(Value::Null)),
2388        )]);
2389        let middle_record_variation_2 = Value::Record(vec![(
2390            "middle_field_1".into(),
2391            Value::Union(1, Box::new(inner_record.clone())),
2392        )]);
2393        let outer_record_variation_1 = Value::Record(vec![
2394            (
2395                "outer_field_1".into(),
2396                Value::Union(0, Box::new(Value::Null)),
2397            ),
2398            ("outer_field_2".into(), inner_record.clone()),
2399        ]);
2400        let outer_record_variation_2 = Value::Record(vec![
2401            (
2402                "outer_field_1".into(),
2403                Value::Union(1, Box::new(middle_record_variation_1)),
2404            ),
2405            ("outer_field_2".into(), inner_record.clone()),
2406        ]);
2407        let outer_record_variation_3 = Value::Record(vec![
2408            (
2409                "outer_field_1".into(),
2410                Value::Union(1, Box::new(middle_record_variation_2)),
2411            ),
2412            ("outer_field_2".into(), inner_record),
2413        ]);
2414
2415        outer_record_variation_1
2416            .resolve(&schema)
2417            .expect("Should be able to resolve value to the schema that is it's definition");
2418        outer_record_variation_2
2419            .resolve(&schema)
2420            .expect("Should be able to resolve value to the schema that is it's definition");
2421        outer_record_variation_3
2422            .resolve(&schema)
2423            .expect("Should be able to resolve value to the schema that is it's definition");
2424
2425        Ok(())
2426    }
2427
2428    #[test]
2429    fn test_avro_3461_test_multi_level_resolve_middle_namespace() -> TestResult {
2430        let schema = r#"
2431        {
2432          "name": "record_name",
2433          "namespace": "space",
2434          "type": "record",
2435          "fields": [
2436            {
2437              "name": "outer_field_1",
2438              "type": [
2439                        "null",
2440                        {
2441                            "type": "record",
2442                            "name": "middle_record_name",
2443                            "namespace":"middle_namespace",
2444                            "fields":[
2445                                {
2446                                    "name":"middle_field_1",
2447                                    "type":[
2448                                        "null",
2449                                        {
2450                                            "type":"record",
2451                                            "name":"inner_record_name",
2452                                            "fields":[
2453                                                {
2454                                                    "name":"inner_field_1",
2455                                                    "type":"double"
2456                                                }
2457                                            ]
2458                                        }
2459                                    ]
2460                                }
2461                            ]
2462                        }
2463                    ]
2464            },
2465            {
2466                "name": "outer_field_2",
2467                "type" : "middle_namespace.inner_record_name"
2468            }
2469          ]
2470        }
2471        "#;
2472        let schema = Schema::parse_str(schema)?;
2473        let inner_record = Value::Record(vec![("inner_field_1".into(), Value::Double(5.4))]);
2474        let middle_record_variation_1 = Value::Record(vec![(
2475            "middle_field_1".into(),
2476            Value::Union(0, Box::new(Value::Null)),
2477        )]);
2478        let middle_record_variation_2 = Value::Record(vec![(
2479            "middle_field_1".into(),
2480            Value::Union(1, Box::new(inner_record.clone())),
2481        )]);
2482        let outer_record_variation_1 = Value::Record(vec![
2483            (
2484                "outer_field_1".into(),
2485                Value::Union(0, Box::new(Value::Null)),
2486            ),
2487            ("outer_field_2".into(), inner_record.clone()),
2488        ]);
2489        let outer_record_variation_2 = Value::Record(vec![
2490            (
2491                "outer_field_1".into(),
2492                Value::Union(1, Box::new(middle_record_variation_1)),
2493            ),
2494            ("outer_field_2".into(), inner_record.clone()),
2495        ]);
2496        let outer_record_variation_3 = Value::Record(vec![
2497            (
2498                "outer_field_1".into(),
2499                Value::Union(1, Box::new(middle_record_variation_2)),
2500            ),
2501            ("outer_field_2".into(), inner_record),
2502        ]);
2503
2504        outer_record_variation_1
2505            .resolve(&schema)
2506            .expect("Should be able to resolve value to the schema that is it's definition");
2507        outer_record_variation_2
2508            .resolve(&schema)
2509            .expect("Should be able to resolve value to the schema that is it's definition");
2510        outer_record_variation_3
2511            .resolve(&schema)
2512            .expect("Should be able to resolve value to the schema that is it's definition");
2513
2514        Ok(())
2515    }
2516
2517    #[test]
2518    fn test_avro_3461_test_multi_level_resolve_inner_namespace() -> TestResult {
2519        let schema = r#"
2520        {
2521          "name": "record_name",
2522          "namespace": "space",
2523          "type": "record",
2524          "fields": [
2525            {
2526              "name": "outer_field_1",
2527              "type": [
2528                        "null",
2529                        {
2530                            "type": "record",
2531                            "name": "middle_record_name",
2532                            "namespace":"middle_namespace",
2533                            "fields":[
2534                                {
2535                                    "name":"middle_field_1",
2536                                    "type":[
2537                                        "null",
2538                                        {
2539                                            "type":"record",
2540                                            "name":"inner_record_name",
2541                                            "namespace":"inner_namespace",
2542                                            "fields":[
2543                                                {
2544                                                    "name":"inner_field_1",
2545                                                    "type":"double"
2546                                                }
2547                                            ]
2548                                        }
2549                                    ]
2550                                }
2551                            ]
2552                        }
2553                    ]
2554            },
2555            {
2556                "name": "outer_field_2",
2557                "type" : "inner_namespace.inner_record_name"
2558            }
2559          ]
2560        }
2561        "#;
2562        let schema = Schema::parse_str(schema)?;
2563
2564        let inner_record = Value::Record(vec![("inner_field_1".into(), Value::Double(5.4))]);
2565        let middle_record_variation_1 = Value::Record(vec![(
2566            "middle_field_1".into(),
2567            Value::Union(0, Box::new(Value::Null)),
2568        )]);
2569        let middle_record_variation_2 = Value::Record(vec![(
2570            "middle_field_1".into(),
2571            Value::Union(1, Box::new(inner_record.clone())),
2572        )]);
2573        let outer_record_variation_1 = Value::Record(vec![
2574            (
2575                "outer_field_1".into(),
2576                Value::Union(0, Box::new(Value::Null)),
2577            ),
2578            ("outer_field_2".into(), inner_record.clone()),
2579        ]);
2580        let outer_record_variation_2 = Value::Record(vec![
2581            (
2582                "outer_field_1".into(),
2583                Value::Union(1, Box::new(middle_record_variation_1)),
2584            ),
2585            ("outer_field_2".into(), inner_record.clone()),
2586        ]);
2587        let outer_record_variation_3 = Value::Record(vec![
2588            (
2589                "outer_field_1".into(),
2590                Value::Union(1, Box::new(middle_record_variation_2)),
2591            ),
2592            ("outer_field_2".into(), inner_record),
2593        ]);
2594
2595        outer_record_variation_1
2596            .resolve(&schema)
2597            .expect("Should be able to resolve value to the schema that is it's definition");
2598        outer_record_variation_2
2599            .resolve(&schema)
2600            .expect("Should be able to resolve value to the schema that is it's definition");
2601        outer_record_variation_3
2602            .resolve(&schema)
2603            .expect("Should be able to resolve value to the schema that is it's definition");
2604
2605        Ok(())
2606    }
2607
2608    #[test]
2609    fn test_avro_3460_validation_with_refs() -> TestResult {
2610        let schema = Schema::parse_str(
2611            r#"
2612        {
2613            "type":"record",
2614            "name":"TestStruct",
2615            "fields": [
2616                {
2617                    "name":"a",
2618                    "type":{
2619                        "type":"record",
2620                        "name": "Inner",
2621                        "fields": [ {
2622                            "name":"z",
2623                            "type":"int"
2624                        }]
2625                    }
2626                },
2627                {
2628                    "name":"b",
2629                    "type":"Inner"
2630                }
2631            ]
2632        }"#,
2633        )?;
2634
2635        let inner_value_right = Value::Record(vec![("z".into(), Value::Int(3))]);
2636        let inner_value_wrong1 = Value::Record(vec![("z".into(), Value::Null)]);
2637        let inner_value_wrong2 = Value::Record(vec![("a".into(), Value::String("testing".into()))]);
2638        let outer1 = Value::Record(vec![
2639            ("a".into(), inner_value_right.clone()),
2640            ("b".into(), inner_value_wrong1),
2641        ]);
2642
2643        let outer2 = Value::Record(vec![
2644            ("a".into(), inner_value_right),
2645            ("b".into(), inner_value_wrong2),
2646        ]);
2647
2648        assert!(
2649            !outer1.validate(&schema),
2650            "field b record is invalid against the schema"
2651        ); // this should pass, but doesn't
2652        assert!(
2653            !outer2.validate(&schema),
2654            "field b record is invalid against the schema"
2655        ); // this should pass, but doesn't
2656
2657        Ok(())
2658    }
2659
2660    #[test]
2661    fn test_avro_3460_validation_with_refs_real_struct() -> TestResult {
2662        use crate::ser::Serializer;
2663        use serde::Serialize;
2664
2665        #[derive(Serialize, Clone)]
2666        struct TestInner {
2667            z: i32,
2668        }
2669
2670        #[derive(Serialize)]
2671        struct TestRefSchemaStruct1 {
2672            a: TestInner,
2673            b: String, // could be literally anything
2674        }
2675
2676        #[derive(Serialize)]
2677        struct TestRefSchemaStruct2 {
2678            a: TestInner,
2679            b: i32, // could be literally anything
2680        }
2681
2682        #[derive(Serialize)]
2683        struct TestRefSchemaStruct3 {
2684            a: TestInner,
2685            b: Option<TestInner>, // could be literally anything
2686        }
2687
2688        let schema = Schema::parse_str(
2689            r#"
2690        {
2691            "type":"record",
2692            "name":"TestStruct",
2693            "fields": [
2694                {
2695                    "name":"a",
2696                    "type":{
2697                        "type":"record",
2698                        "name": "Inner",
2699                        "fields": [ {
2700                            "name":"z",
2701                            "type":"int"
2702                        }]
2703                    }
2704                },
2705                {
2706                    "name":"b",
2707                    "type":"Inner"
2708                }
2709            ]
2710        }"#,
2711        )?;
2712
2713        let test_inner = TestInner { z: 3 };
2714        let test_outer1 = TestRefSchemaStruct1 {
2715            a: test_inner.clone(),
2716            b: "testing".into(),
2717        };
2718        let test_outer2 = TestRefSchemaStruct2 {
2719            a: test_inner.clone(),
2720            b: 24,
2721        };
2722        let test_outer3 = TestRefSchemaStruct3 {
2723            a: test_inner,
2724            b: None,
2725        };
2726
2727        let mut ser = Serializer::default();
2728        let test_outer1: Value = test_outer1.serialize(&mut ser)?;
2729        let mut ser = Serializer::default();
2730        let test_outer2: Value = test_outer2.serialize(&mut ser)?;
2731        let mut ser = Serializer::default();
2732        let test_outer3: Value = test_outer3.serialize(&mut ser)?;
2733
2734        assert!(
2735            !test_outer1.validate(&schema),
2736            "field b record is invalid against the schema"
2737        );
2738        assert!(
2739            !test_outer2.validate(&schema),
2740            "field b record is invalid against the schema"
2741        );
2742        assert!(
2743            !test_outer3.validate(&schema),
2744            "field b record is invalid against the schema"
2745        );
2746
2747        Ok(())
2748    }
2749
2750    fn avro_3674_with_or_without_namespace(with_namespace: bool) -> TestResult {
2751        use crate::ser::Serializer;
2752        use serde::Serialize;
2753
2754        let schema_str = r#"{
2755            "type": "record",
2756            "name": "NamespacedMessage",
2757            [NAMESPACE]
2758            "fields": [
2759                {
2760                    "type": "record",
2761                    "name": "field_a",
2762                    "fields": [
2763                        {
2764                            "name": "enum_a",
2765                            "type": {
2766                                "type": "enum",
2767                                "name": "EnumType",
2768                                "symbols": [
2769                                    "SYMBOL_1",
2770                                    "SYMBOL_2"
2771                                ],
2772                                "default": "SYMBOL_1"
2773                            }
2774                        },
2775                        {
2776                            "name": "enum_b",
2777                            "type": "EnumType"
2778                        }
2779                    ]
2780                }
2781            ]
2782        }"#;
2783        let schema_str = schema_str.replace(
2784            "[NAMESPACE]",
2785            if with_namespace {
2786                r#""namespace": "com.domain","#
2787            } else {
2788                ""
2789            },
2790        );
2791
2792        let schema = Schema::parse_str(&schema_str)?;
2793
2794        #[derive(Serialize)]
2795        enum EnumType {
2796            #[serde(rename = "SYMBOL_1")]
2797            Symbol1,
2798            #[serde(rename = "SYMBOL_2")]
2799            Symbol2,
2800        }
2801
2802        #[derive(Serialize)]
2803        struct FieldA {
2804            enum_a: EnumType,
2805            enum_b: EnumType,
2806        }
2807
2808        #[derive(Serialize)]
2809        struct NamespacedMessage {
2810            field_a: FieldA,
2811        }
2812
2813        let msg = NamespacedMessage {
2814            field_a: FieldA {
2815                enum_a: EnumType::Symbol2,
2816                enum_b: EnumType::Symbol1,
2817            },
2818        };
2819
2820        let mut ser = Serializer::default();
2821        let test_value: Value = msg.serialize(&mut ser)?;
2822        assert!(test_value.validate(&schema), "test_value should validate");
2823        assert!(
2824            test_value.resolve(&schema).is_ok(),
2825            "test_value should resolve"
2826        );
2827
2828        Ok(())
2829    }
2830
2831    #[test]
2832    fn test_avro_3674_validate_no_namespace_resolution() -> TestResult {
2833        avro_3674_with_or_without_namespace(false)
2834    }
2835
2836    #[test]
2837    fn test_avro_3674_validate_with_namespace_resolution() -> TestResult {
2838        avro_3674_with_or_without_namespace(true)
2839    }
2840
2841    fn avro_3688_schema_resolution_panic(set_field_b: bool) -> TestResult {
2842        use crate::ser::Serializer;
2843        use serde::{Deserialize, Serialize};
2844
2845        let schema_str = r#"{
2846            "type": "record",
2847            "name": "Message",
2848            "fields": [
2849                {
2850                    "name": "field_a",
2851                    "type": [
2852                        "null",
2853                        {
2854                            "name": "Inner",
2855                            "type": "record",
2856                            "fields": [
2857                                {
2858                                    "name": "inner_a",
2859                                    "type": "string"
2860                                }
2861                            ]
2862                        }
2863                    ],
2864                    "default": null
2865                },
2866                {
2867                    "name": "field_b",
2868                    "type": [
2869                        "null",
2870                        "Inner"
2871                    ],
2872                    "default": null
2873                }
2874            ]
2875        }"#;
2876
2877        #[derive(Serialize, Deserialize)]
2878        struct Inner {
2879            inner_a: String,
2880        }
2881
2882        #[derive(Serialize, Deserialize)]
2883        struct Message {
2884            field_a: Option<Inner>,
2885            field_b: Option<Inner>,
2886        }
2887
2888        let schema = Schema::parse_str(schema_str)?;
2889
2890        let msg = Message {
2891            field_a: Some(Inner {
2892                inner_a: "foo".to_string(),
2893            }),
2894            field_b: if set_field_b {
2895                Some(Inner {
2896                    inner_a: "bar".to_string(),
2897                })
2898            } else {
2899                None
2900            },
2901        };
2902
2903        let mut ser = Serializer::default();
2904        let test_value: Value = msg.serialize(&mut ser)?;
2905        assert!(test_value.validate(&schema), "test_value should validate");
2906        assert!(
2907            test_value.resolve(&schema).is_ok(),
2908            "test_value should resolve"
2909        );
2910
2911        Ok(())
2912    }
2913
2914    #[test]
2915    fn test_avro_3688_field_b_not_set() -> TestResult {
2916        avro_3688_schema_resolution_panic(false)
2917    }
2918
2919    #[test]
2920    fn test_avro_3688_field_b_set() -> TestResult {
2921        avro_3688_schema_resolution_panic(true)
2922    }
2923
2924    #[test]
2925    fn test_avro_3764_use_resolve_schemata() -> TestResult {
2926        let referenced_schema =
2927            r#"{"name": "enumForReference", "type": "enum", "symbols": ["A", "B"]}"#;
2928        let main_schema = r#"{"name": "recordWithReference", "type": "record", "fields": [{"name": "reference", "type": "enumForReference"}]}"#;
2929
2930        let value: serde_json::Value = serde_json::from_str(
2931            r#"
2932            {
2933                "reference": "A"
2934            }
2935        "#,
2936        )?;
2937
2938        let avro_value = Value::from(value);
2939
2940        let schemas = Schema::parse_list(&[main_schema, referenced_schema])?;
2941
2942        let main_schema = schemas.first().unwrap();
2943        let schemata: Vec<_> = schemas.iter().skip(1).collect();
2944
2945        let resolve_result = avro_value.clone().resolve_schemata(main_schema, schemata);
2946
2947        assert!(
2948            resolve_result.is_ok(),
2949            "result of resolving with schemata should be ok, got: {:?}",
2950            resolve_result
2951        );
2952
2953        let resolve_result = avro_value.resolve(main_schema);
2954        assert!(
2955            resolve_result.is_err(),
2956            "result of resolving without schemata should be err, got: {:?}",
2957            resolve_result
2958        );
2959
2960        Ok(())
2961    }
2962
2963    #[test]
2964    fn test_avro_3767_union_resolve_complex_refs() -> TestResult {
2965        let referenced_enum =
2966            r#"{"name": "enumForReference", "type": "enum", "symbols": ["A", "B"]}"#;
2967        let referenced_record = r#"{"name": "recordForReference", "type": "record", "fields": [{"name": "refInRecord", "type": "enumForReference"}]}"#;
2968        let main_schema = r#"{"name": "recordWithReference", "type": "record", "fields": [{"name": "reference", "type": ["null", "recordForReference"]}]}"#;
2969
2970        let value: serde_json::Value = serde_json::from_str(
2971            r#"
2972            {
2973                "reference": {
2974                    "refInRecord": "A"
2975                }
2976            }
2977        "#,
2978        )?;
2979
2980        let avro_value = Value::from(value);
2981
2982        let schemata = Schema::parse_list(&[referenced_enum, referenced_record, main_schema])?;
2983
2984        let main_schema = schemata.last().unwrap();
2985        let other_schemata: Vec<&Schema> = schemata.iter().take(2).collect();
2986
2987        let resolve_result = avro_value.resolve_schemata(main_schema, other_schemata);
2988
2989        assert!(
2990            resolve_result.is_ok(),
2991            "result of resolving with schemata should be ok, got: {:?}",
2992            resolve_result
2993        );
2994
2995        assert!(
2996            resolve_result?.validate_schemata(schemata.iter().collect()),
2997            "result of validation with schemata should be true"
2998        );
2999
3000        Ok(())
3001    }
3002
3003    #[test]
3004    fn test_avro_3782_incorrect_decimal_resolving() -> TestResult {
3005        let schema = r#"{"name": "decimalSchema", "logicalType": "decimal", "type": "fixed", "precision": 8, "scale": 0, "size": 8}"#;
3006
3007        let avro_value = Value::Decimal(Decimal::from(
3008            BigInt::from(12345678u32).to_signed_bytes_be(),
3009        ));
3010        let schema = Schema::parse_str(schema)?;
3011        let resolve_result = avro_value.resolve(&schema);
3012        assert!(
3013            resolve_result.is_ok(),
3014            "resolve result must be ok, got: {resolve_result:?}"
3015        );
3016
3017        Ok(())
3018    }
3019
3020    #[test]
3021    fn test_avro_3779_bigdecimal_resolving() -> TestResult {
3022        let schema =
3023            r#"{"name": "bigDecimalSchema", "logicalType": "big-decimal", "type": "bytes" }"#;
3024
3025        let avro_value = Value::BigDecimal(BigDecimal::from(12345678u32));
3026        let schema = Schema::parse_str(schema)?;
3027        let resolve_result: AvroResult<Value> = avro_value.resolve(&schema);
3028        assert!(
3029            resolve_result.is_ok(),
3030            "resolve result must be ok, got: {resolve_result:?}"
3031        );
3032
3033        Ok(())
3034    }
3035
3036    #[test]
3037    fn test_avro_3892_resolve_fixed_from_bytes() -> TestResult {
3038        let value = Value::Bytes(vec![97, 98, 99]);
3039        assert_eq!(
3040            value.resolve(&Schema::Fixed(FixedSchema {
3041                name: "test".into(),
3042                aliases: None,
3043                doc: None,
3044                size: 3,
3045                default: None,
3046                attributes: Default::default()
3047            }))?,
3048            Value::Fixed(3, vec![97, 98, 99])
3049        );
3050
3051        let value = Value::Bytes(vec![97, 99]);
3052        assert!(value
3053            .resolve(&Schema::Fixed(FixedSchema {
3054                name: "test".into(),
3055                aliases: None,
3056                doc: None,
3057                size: 3,
3058                default: None,
3059                attributes: Default::default()
3060            }))
3061            .is_err(),);
3062
3063        let value = Value::Bytes(vec![97, 98, 99, 100]);
3064        assert!(value
3065            .resolve(&Schema::Fixed(FixedSchema {
3066                name: "test".into(),
3067                aliases: None,
3068                doc: None,
3069                size: 3,
3070                default: None,
3071                attributes: Default::default()
3072            }))
3073            .is_err(),);
3074
3075        Ok(())
3076    }
3077
3078    #[test]
3079    fn avro_3928_from_serde_value_to_types_value() {
3080        assert_eq!(Value::from(serde_json::Value::Null), Value::Null);
3081        assert_eq!(Value::from(json!(true)), Value::Boolean(true));
3082        assert_eq!(Value::from(json!(false)), Value::Boolean(false));
3083        assert_eq!(Value::from(json!(0)), Value::Int(0));
3084        assert_eq!(Value::from(json!(i32::MIN)), Value::Int(i32::MIN));
3085        assert_eq!(Value::from(json!(i32::MAX)), Value::Int(i32::MAX));
3086        assert_eq!(
3087            Value::from(json!(i32::MIN as i64 - 1)),
3088            Value::Long(i32::MIN as i64 - 1)
3089        );
3090        assert_eq!(
3091            Value::from(json!(i32::MAX as i64 + 1)),
3092            Value::Long(i32::MAX as i64 + 1)
3093        );
3094        assert_eq!(Value::from(json!(1.23)), Value::Double(1.23));
3095        assert_eq!(Value::from(json!(-1.23)), Value::Double(-1.23));
3096        assert_eq!(Value::from(json!(u64::MIN)), Value::Int(u64::MIN as i32));
3097        assert_eq!(Value::from(json!(u64::MAX)), Value::Long(u64::MAX as i64));
3098        assert_eq!(
3099            Value::from(json!("some text")),
3100            Value::String("some text".into())
3101        );
3102        assert_eq!(
3103            Value::from(json!(["text1", "text2", "text3"])),
3104            Value::Array(vec![
3105                Value::String("text1".into()),
3106                Value::String("text2".into()),
3107                Value::String("text3".into())
3108            ])
3109        );
3110        assert_eq!(
3111            Value::from(json!({"key1": "value1", "key2": "value2"})),
3112            Value::Map(
3113                vec![
3114                    ("key1".into(), Value::String("value1".into())),
3115                    ("key2".into(), Value::String("value2".into()))
3116                ]
3117                .into_iter()
3118                .collect()
3119            )
3120        );
3121    }
3122}