1use crate::{
20 bigdecimal::{deserialize_big_decimal, serialize_big_decimal},
21 decimal::Decimal,
22 duration::Duration,
23 schema::{
24 DecimalSchema, EnumSchema, FixedSchema, Name, Namespace, Precision, RecordField,
25 RecordSchema, ResolvedSchema, Scale, Schema, SchemaKind, UnionSchema,
26 },
27 AvroResult, Error,
28};
29use bigdecimal::BigDecimal;
30use serde_json::{Number, Value as JsonValue};
31use std::{
32 borrow::Borrow,
33 collections::{BTreeMap, HashMap},
34 fmt::Debug,
35 hash::BuildHasher,
36 str::FromStr,
37};
38use uuid::Uuid;
39
40fn max_prec_for_len(len: usize) -> Result<usize, Error> {
42 let len = i32::try_from(len).map_err(|e| Error::ConvertLengthToI32(e, len))?;
43 Ok((2.0_f64.powi(8 * len - 1) - 1.0).log10().floor() as usize)
44}
45
46#[derive(Clone, Debug, PartialEq, strum_macros::EnumDiscriminants)]
51#[strum_discriminants(name(ValueKind))]
52pub enum Value {
53 Null,
55 Boolean(bool),
57 Int(i32),
59 Long(i64),
61 Float(f32),
63 Double(f64),
65 Bytes(Vec<u8>),
67 String(String),
69 Fixed(usize, Vec<u8>),
72 Enum(u32, String),
79 Union(u32, Box<Value>),
86 Array(Vec<Value>),
88 Map(HashMap<String, Value>),
90 Record(Vec<(String, Value)>),
97 Date(i32),
102 Decimal(Decimal),
104 BigDecimal(BigDecimal),
106 TimeMillis(i32),
108 TimeMicros(i64),
110 TimestampMillis(i64),
112 TimestampMicros(i64),
114 TimestampNanos(i64),
116 LocalTimestampMillis(i64),
118 LocalTimestampMicros(i64),
120 LocalTimestampNanos(i64),
122 Duration(Duration),
124 Uuid(Uuid),
126}
127
128#[deprecated(
131 since = "0.11.0",
132 note = "Please use Value::from, Into::into or value.into() instead"
133)]
134pub trait ToAvro {
135 fn avro(self) -> Value;
137}
138
139#[allow(deprecated)]
140impl<T: Into<Value>> ToAvro for T {
141 fn avro(self) -> Value {
142 self.into()
143 }
144}
145
146macro_rules! to_value(
147 ($type:ty, $variant_constructor:expr) => (
148 impl From<$type> for Value {
149 fn from(value: $type) -> Self {
150 $variant_constructor(value)
151 }
152 }
153 );
154);
155
156to_value!(bool, Value::Boolean);
157to_value!(i32, Value::Int);
158to_value!(i64, Value::Long);
159to_value!(f32, Value::Float);
160to_value!(f64, Value::Double);
161to_value!(String, Value::String);
162to_value!(Vec<u8>, Value::Bytes);
163to_value!(uuid::Uuid, Value::Uuid);
164to_value!(Decimal, Value::Decimal);
165to_value!(BigDecimal, Value::BigDecimal);
166to_value!(Duration, Value::Duration);
167
168impl From<()> for Value {
169 fn from(_: ()) -> Self {
170 Self::Null
171 }
172}
173
174impl From<usize> for Value {
175 fn from(value: usize) -> Self {
176 i64::try_from(value)
177 .expect("cannot convert usize to i64")
178 .into()
179 }
180}
181
182impl From<&str> for Value {
183 fn from(value: &str) -> Self {
184 Self::String(value.to_owned())
185 }
186}
187
188impl From<&[u8]> for Value {
189 fn from(value: &[u8]) -> Self {
190 Self::Bytes(value.to_owned())
191 }
192}
193
194impl<T> From<Option<T>> for Value
195where
196 T: Into<Self>,
197{
198 fn from(value: Option<T>) -> Self {
199 Self::Union(
201 value.is_some() as u32,
202 Box::new(value.map_or_else(|| Self::Null, Into::into)),
203 )
204 }
205}
206
207impl<K, V, S> From<HashMap<K, V, S>> for Value
208where
209 K: Into<String>,
210 V: Into<Self>,
211 S: BuildHasher,
212{
213 fn from(value: HashMap<K, V, S>) -> Self {
214 Self::Map(
215 value
216 .into_iter()
217 .map(|(key, value)| (key.into(), value.into()))
218 .collect(),
219 )
220 }
221}
222
223#[derive(Debug, Clone)]
225pub struct Record<'a> {
226 pub fields: Vec<(String, Value)>,
230 schema_lookup: &'a BTreeMap<String, usize>,
231}
232
233impl<'a> Record<'a> {
234 pub fn new(schema: &Schema) -> Option<Record> {
238 match *schema {
239 Schema::Record(RecordSchema {
240 fields: ref schema_fields,
241 lookup: ref schema_lookup,
242 ..
243 }) => {
244 let mut fields = Vec::with_capacity(schema_fields.len());
245 for schema_field in schema_fields.iter() {
246 fields.push((schema_field.name.clone(), Value::Null));
247 }
248
249 Some(Record {
250 fields,
251 schema_lookup,
252 })
253 }
254 _ => None,
255 }
256 }
257
258 pub fn put<V>(&mut self, field: &str, value: V)
264 where
265 V: Into<Value>,
266 {
267 if let Some(&position) = self.schema_lookup.get(field) {
268 self.fields[position].1 = value.into()
269 }
270 }
271}
272
273impl<'a> From<Record<'a>> for Value {
274 fn from(value: Record<'a>) -> Self {
275 Self::Record(value.fields)
276 }
277}
278
279impl From<JsonValue> for Value {
280 fn from(value: JsonValue) -> Self {
281 match value {
282 JsonValue::Null => Self::Null,
283 JsonValue::Bool(b) => b.into(),
284 JsonValue::Number(ref n) if n.is_i64() => {
285 let n = n.as_i64().unwrap();
286 if n >= i32::MIN as i64 && n <= i32::MAX as i64 {
287 Value::Int(n as i32)
288 } else {
289 Value::Long(n)
290 }
291 }
292 JsonValue::Number(ref n) if n.is_f64() => Value::Double(n.as_f64().unwrap()),
293 JsonValue::Number(n) => Value::Long(n.as_u64().unwrap() as i64), JsonValue::String(s) => s.into(),
295 JsonValue::Array(items) => Value::Array(items.into_iter().map(Value::from).collect()),
296 JsonValue::Object(items) => Value::Map(
297 items
298 .into_iter()
299 .map(|(key, value)| (key, value.into()))
300 .collect(),
301 ),
302 }
303 }
304}
305
306impl TryFrom<Value> for JsonValue {
308 type Error = crate::error::Error;
309 fn try_from(value: Value) -> AvroResult<Self> {
310 match value {
311 Value::Null => Ok(Self::Null),
312 Value::Boolean(b) => Ok(Self::Bool(b)),
313 Value::Int(i) => Ok(Self::Number(i.into())),
314 Value::Long(l) => Ok(Self::Number(l.into())),
315 Value::Float(f) => Number::from_f64(f.into())
316 .map(Self::Number)
317 .ok_or_else(|| Error::ConvertF64ToJson(f.into())),
318 Value::Double(d) => Number::from_f64(d)
319 .map(Self::Number)
320 .ok_or(Error::ConvertF64ToJson(d)),
321 Value::Bytes(bytes) => Ok(Self::Array(bytes.into_iter().map(|b| b.into()).collect())),
322 Value::String(s) => Ok(Self::String(s)),
323 Value::Fixed(_size, items) => {
324 Ok(Self::Array(items.into_iter().map(|v| v.into()).collect()))
325 }
326 Value::Enum(_i, s) => Ok(Self::String(s)),
327 Value::Union(_i, b) => Self::try_from(*b),
328 Value::Array(items) => items
329 .into_iter()
330 .map(Self::try_from)
331 .collect::<Result<Vec<_>, _>>()
332 .map(Self::Array),
333 Value::Map(items) => items
334 .into_iter()
335 .map(|(key, value)| Self::try_from(value).map(|v| (key, v)))
336 .collect::<Result<Vec<_>, _>>()
337 .map(|v| Self::Object(v.into_iter().collect())),
338 Value::Record(items) => items
339 .into_iter()
340 .map(|(key, value)| Self::try_from(value).map(|v| (key, v)))
341 .collect::<Result<Vec<_>, _>>()
342 .map(|v| Self::Object(v.into_iter().collect())),
343 Value::Date(d) => Ok(Self::Number(d.into())),
344 Value::Decimal(ref d) => <Vec<u8>>::try_from(d)
345 .map(|vec| Self::Array(vec.into_iter().map(|v| v.into()).collect())),
346 Value::BigDecimal(ref bg) => {
347 let vec1: Vec<u8> = serialize_big_decimal(bg);
348 Ok(Self::Array(vec1.into_iter().map(|b| b.into()).collect()))
349 }
350 Value::TimeMillis(t) => Ok(Self::Number(t.into())),
351 Value::TimeMicros(t) => Ok(Self::Number(t.into())),
352 Value::TimestampMillis(t) => Ok(Self::Number(t.into())),
353 Value::TimestampMicros(t) => Ok(Self::Number(t.into())),
354 Value::TimestampNanos(t) => Ok(Self::Number(t.into())),
355 Value::LocalTimestampMillis(t) => Ok(Self::Number(t.into())),
356 Value::LocalTimestampMicros(t) => Ok(Self::Number(t.into())),
357 Value::LocalTimestampNanos(t) => Ok(Self::Number(t.into())),
358 Value::Duration(d) => Ok(Self::Array(
359 <[u8; 12]>::from(d).iter().map(|&v| v.into()).collect(),
360 )),
361 Value::Uuid(uuid) => Ok(Self::String(uuid.as_hyphenated().to_string())),
362 }
363 }
364}
365
366impl Value {
367 pub fn validate(&self, schema: &Schema) -> bool {
372 self.validate_schemata(vec![schema])
373 }
374
375 pub fn validate_schemata(&self, schemata: Vec<&Schema>) -> bool {
376 let rs = ResolvedSchema::try_from(schemata.clone())
377 .expect("Schemata didn't successfully resolve");
378 let schemata_len = schemata.len();
379 schemata.iter().any(|schema| {
380 let enclosing_namespace = schema.namespace();
381
382 match self.validate_internal(schema, rs.get_names(), &enclosing_namespace) {
383 Some(reason) => {
384 let log_message = format!(
385 "Invalid value: {:?} for schema: {:?}. Reason: {}",
386 self, schema, reason
387 );
388 if schemata_len == 1 {
389 error!("{}", log_message);
390 } else {
391 debug!("{}", log_message);
392 };
393 false
394 }
395 None => true,
396 }
397 })
398 }
399
400 fn accumulate(accumulator: Option<String>, other: Option<String>) -> Option<String> {
401 match (accumulator, other) {
402 (None, None) => None,
403 (None, s @ Some(_)) => s,
404 (s @ Some(_), None) => s,
405 (Some(reason1), Some(reason2)) => Some(format!("{reason1}\n{reason2}")),
406 }
407 }
408
409 pub(crate) fn validate_internal<S: std::borrow::Borrow<Schema> + Debug>(
411 &self,
412 schema: &Schema,
413 names: &HashMap<Name, S>,
414 enclosing_namespace: &Namespace,
415 ) -> Option<String> {
416 match (self, schema) {
417 (_, Schema::Ref { name }) => {
418 let name = name.fully_qualified_name(enclosing_namespace);
419 names.get(&name).map_or_else(
420 || {
421 Some(format!(
422 "Unresolved schema reference: '{:?}'. Parsed names: {:?}",
423 name,
424 names.keys()
425 ))
426 },
427 |s| self.validate_internal(s.borrow(), names, &name.namespace),
428 )
429 }
430 (&Value::Null, &Schema::Null) => None,
431 (&Value::Boolean(_), &Schema::Boolean) => None,
432 (&Value::Int(_), &Schema::Int) => None,
433 (&Value::Int(_), &Schema::Date) => None,
434 (&Value::Int(_), &Schema::TimeMillis) => None,
435 (&Value::Int(_), &Schema::Long) => None,
436 (&Value::Long(_), &Schema::Long) => None,
437 (&Value::Long(_), &Schema::TimeMicros) => None,
438 (&Value::Long(_), &Schema::TimestampMillis) => None,
439 (&Value::Long(_), &Schema::TimestampMicros) => None,
440 (&Value::Long(_), &Schema::LocalTimestampMillis) => None,
441 (&Value::Long(_), &Schema::LocalTimestampMicros) => None,
442 (&Value::TimestampMicros(_), &Schema::TimestampMicros) => None,
443 (&Value::TimestampMillis(_), &Schema::TimestampMillis) => None,
444 (&Value::TimestampNanos(_), &Schema::TimestampNanos) => None,
445 (&Value::LocalTimestampMicros(_), &Schema::LocalTimestampMicros) => None,
446 (&Value::LocalTimestampMillis(_), &Schema::LocalTimestampMillis) => None,
447 (&Value::LocalTimestampNanos(_), &Schema::LocalTimestampNanos) => None,
448 (&Value::TimeMicros(_), &Schema::TimeMicros) => None,
449 (&Value::TimeMillis(_), &Schema::TimeMillis) => None,
450 (&Value::Date(_), &Schema::Date) => None,
451 (&Value::Decimal(_), &Schema::Decimal { .. }) => None,
452 (&Value::BigDecimal(_), &Schema::BigDecimal) => None,
453 (&Value::Duration(_), &Schema::Duration) => None,
454 (&Value::Uuid(_), &Schema::Uuid) => None,
455 (&Value::Float(_), &Schema::Float) => None,
456 (&Value::Float(_), &Schema::Double) => None,
457 (&Value::Double(_), &Schema::Double) => None,
458 (&Value::Bytes(_), &Schema::Bytes) => None,
459 (&Value::Bytes(_), &Schema::Decimal { .. }) => None,
460 (&Value::String(_), &Schema::String) => None,
461 (&Value::String(_), &Schema::Uuid) => None,
462 (&Value::Fixed(n, _), &Schema::Fixed(FixedSchema { size, .. })) => {
463 if n != size {
464 Some(format!(
465 "The value's size ({n}) is different than the schema's size ({size})"
466 ))
467 } else {
468 None
469 }
470 }
471 (Value::Bytes(b), &Schema::Fixed(FixedSchema { size, .. })) => {
472 if b.len() != size {
473 Some(format!(
474 "The bytes' length ({}) is different than the schema's size ({})",
475 b.len(),
476 size
477 ))
478 } else {
479 None
480 }
481 }
482 (&Value::Fixed(n, _), &Schema::Duration) => {
483 if n != 12 {
484 Some(format!(
485 "The value's size ('{n}') must be exactly 12 to be a Duration"
486 ))
487 } else {
488 None
489 }
490 }
491 (&Value::Fixed(_n, _), &Schema::Decimal { .. }) => None,
493 (Value::String(s), Schema::Enum(EnumSchema { symbols, .. })) => {
494 if !symbols.contains(s) {
495 Some(format!("'{s}' is not a member of the possible symbols"))
496 } else {
497 None
498 }
499 }
500 (
501 &Value::Enum(i, ref s),
502 Schema::Enum(EnumSchema {
503 symbols, default, ..
504 }),
505 ) => symbols
506 .get(i as usize)
507 .map(|ref symbol| {
508 if symbol != &s {
509 Some(format!("Symbol '{s}' is not at position '{i}'"))
510 } else {
511 None
512 }
513 })
514 .unwrap_or_else(|| match default {
515 Some(_) => None,
516 None => Some(format!("No symbol at position '{i}'")),
517 }),
518 (&Value::Union(i, ref value), Schema::Union(inner)) => inner
520 .variants()
521 .get(i as usize)
522 .map(|schema| value.validate_internal(schema, names, enclosing_namespace))
523 .unwrap_or_else(|| Some(format!("No schema in the union at position '{i}'"))),
524 (v, Schema::Union(inner)) => {
525 match inner.find_schema_with_known_schemata(v, Some(names), enclosing_namespace) {
526 Some(_) => None,
527 None => Some("Could not find matching type in union".to_string()),
528 }
529 }
530 (Value::Array(items), Schema::Array(inner)) => items.iter().fold(None, |acc, item| {
531 Value::accumulate(
532 acc,
533 item.validate_internal(&inner.items, names, enclosing_namespace),
534 )
535 }),
536 (Value::Map(items), Schema::Map(inner)) => {
537 items.iter().fold(None, |acc, (_, value)| {
538 Value::accumulate(
539 acc,
540 value.validate_internal(&inner.types, names, enclosing_namespace),
541 )
542 })
543 }
544 (
545 Value::Record(record_fields),
546 Schema::Record(RecordSchema {
547 fields,
548 lookup,
549 name,
550 ..
551 }),
552 ) => {
553 let non_nullable_fields_count =
554 fields.iter().filter(|&rf| !rf.is_nullable()).count();
555
556 if record_fields.len() < non_nullable_fields_count {
558 return Some(format!(
559 "The value's records length ({}) doesn't match the schema ({} non-nullable fields)",
560 record_fields.len(),
561 non_nullable_fields_count
562 ));
563 } else if record_fields.len() > fields.len() {
564 return Some(format!(
565 "The value's records length ({}) is greater than the schema's ({} fields)",
566 record_fields.len(),
567 fields.len(),
568 ));
569 }
570
571 record_fields
572 .iter()
573 .fold(None, |acc, (field_name, record_field)| {
574 let record_namespace = if name.namespace.is_none() {
575 enclosing_namespace
576 } else {
577 &name.namespace
578 };
579 match lookup.get(field_name) {
580 Some(idx) => {
581 let field = &fields[*idx];
582 Value::accumulate(
583 acc,
584 record_field.validate_internal(
585 &field.schema,
586 names,
587 record_namespace,
588 ),
589 )
590 }
591 None => Value::accumulate(
592 acc,
593 Some(format!("There is no schema field for field '{field_name}'")),
594 ),
595 }
596 })
597 }
598 (Value::Map(items), Schema::Record(RecordSchema { fields, .. })) => {
599 fields.iter().fold(None, |acc, field| {
600 if let Some(item) = items.get(&field.name) {
601 let res = item.validate_internal(&field.schema, names, enclosing_namespace);
602 Value::accumulate(acc, res)
603 } else if !field.is_nullable() {
604 Value::accumulate(
605 acc,
606 Some(format!(
607 "Field with name '{:?}' is not a member of the map items",
608 field.name
609 )),
610 )
611 } else {
612 acc
613 }
614 })
615 }
616 (v, s) => Some(format!(
617 "Unsupported value-schema combination! Value: {:?}, schema: {:?}",
618 v, s
619 )),
620 }
621 }
622
623 pub fn resolve(self, schema: &Schema) -> AvroResult<Self> {
630 let enclosing_namespace = schema.namespace();
631 let rs = ResolvedSchema::try_from(schema)?;
632 self.resolve_internal(schema, rs.get_names(), &enclosing_namespace, &None)
633 }
634
635 pub fn resolve_schemata(self, schema: &Schema, schemata: Vec<&Schema>) -> AvroResult<Self> {
642 let enclosing_namespace = schema.namespace();
643 let rs = ResolvedSchema::try_from(schemata)?;
644 self.resolve_internal(schema, rs.get_names(), &enclosing_namespace, &None)
645 }
646
647 pub(crate) fn resolve_internal<S: Borrow<Schema> + Debug>(
648 mut self,
649 schema: &Schema,
650 names: &HashMap<Name, S>,
651 enclosing_namespace: &Namespace,
652 field_default: &Option<JsonValue>,
653 ) -> AvroResult<Self> {
654 if SchemaKind::from(&self) == SchemaKind::Union
656 && SchemaKind::from(schema) != SchemaKind::Union
657 {
658 let v = match self {
660 Value::Union(_i, b) => *b,
661 _ => unreachable!(),
662 };
663 self = v;
664 }
665 match *schema {
666 Schema::Ref { ref name } => {
667 let name = name.fully_qualified_name(enclosing_namespace);
668
669 if let Some(resolved) = names.get(&name) {
670 debug!("Resolved {:?}", name);
671 self.resolve_internal(resolved.borrow(), names, &name.namespace, field_default)
672 } else {
673 error!("Failed to resolve schema {:?}", name);
674 Err(Error::SchemaResolutionError(name.clone()))
675 }
676 }
677 Schema::Null => self.resolve_null(),
678 Schema::Boolean => self.resolve_boolean(),
679 Schema::Int => self.resolve_int(),
680 Schema::Long => self.resolve_long(),
681 Schema::Float => self.resolve_float(),
682 Schema::Double => self.resolve_double(),
683 Schema::Bytes => self.resolve_bytes(),
684 Schema::String => self.resolve_string(),
685 Schema::Fixed(FixedSchema { size, .. }) => self.resolve_fixed(size),
686 Schema::Union(ref inner) => {
687 self.resolve_union(inner, names, enclosing_namespace, field_default)
688 }
689 Schema::Enum(EnumSchema {
690 ref symbols,
691 ref default,
692 ..
693 }) => self.resolve_enum(symbols, default, field_default),
694 Schema::Array(ref inner) => {
695 self.resolve_array(&inner.items, names, enclosing_namespace)
696 }
697 Schema::Map(ref inner) => self.resolve_map(&inner.types, names, enclosing_namespace),
698 Schema::Record(RecordSchema { ref fields, .. }) => {
699 self.resolve_record(fields, names, enclosing_namespace)
700 }
701 Schema::Decimal(DecimalSchema {
702 scale,
703 precision,
704 ref inner,
705 }) => self.resolve_decimal(precision, scale, inner),
706 Schema::BigDecimal => self.resolve_bigdecimal(),
707 Schema::Date => self.resolve_date(),
708 Schema::TimeMillis => self.resolve_time_millis(),
709 Schema::TimeMicros => self.resolve_time_micros(),
710 Schema::TimestampMillis => self.resolve_timestamp_millis(),
711 Schema::TimestampMicros => self.resolve_timestamp_micros(),
712 Schema::TimestampNanos => self.resolve_timestamp_nanos(),
713 Schema::LocalTimestampMillis => self.resolve_local_timestamp_millis(),
714 Schema::LocalTimestampMicros => self.resolve_local_timestamp_micros(),
715 Schema::LocalTimestampNanos => self.resolve_local_timestamp_nanos(),
716 Schema::Duration => self.resolve_duration(),
717 Schema::Uuid => self.resolve_uuid(),
718 }
719 }
720
721 fn resolve_uuid(self) -> Result<Self, Error> {
722 Ok(match self {
723 uuid @ Value::Uuid(_) => uuid,
724 Value::String(ref string) => {
725 Value::Uuid(Uuid::from_str(string).map_err(Error::ConvertStrToUuid)?)
726 }
727 other => return Err(Error::GetUuid(other.into())),
728 })
729 }
730
731 fn resolve_bigdecimal(self) -> Result<Self, Error> {
732 Ok(match self {
733 bg @ Value::BigDecimal(_) => bg,
734 Value::Bytes(b) => Value::BigDecimal(deserialize_big_decimal(&b).unwrap()),
735 other => return Err(Error::GetBigdecimal(other.into())),
736 })
737 }
738
739 fn resolve_duration(self) -> Result<Self, Error> {
740 Ok(match self {
741 duration @ Value::Duration { .. } => duration,
742 Value::Fixed(size, bytes) => {
743 if size != 12 {
744 return Err(Error::GetDecimalFixedBytes(size));
745 }
746 Value::Duration(Duration::from([
747 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
748 bytes[8], bytes[9], bytes[10], bytes[11],
749 ]))
750 }
751 other => return Err(Error::ResolveDuration(other.into())),
752 })
753 }
754
755 fn resolve_decimal(
756 self,
757 precision: Precision,
758 scale: Scale,
759 inner: &Schema,
760 ) -> Result<Self, Error> {
761 if scale > precision {
762 return Err(Error::GetScaleAndPrecision { scale, precision });
763 }
764 match inner {
765 &Schema::Fixed(FixedSchema { size, .. }) => {
766 if max_prec_for_len(size)? < precision {
767 return Err(Error::GetScaleWithFixedSize { size, precision });
768 }
769 }
770 Schema::Bytes => (),
771 _ => return Err(Error::ResolveDecimalSchema(inner.into())),
772 };
773 match self {
774 Value::Decimal(num) => {
775 let num_bytes = num.len();
776 if max_prec_for_len(num_bytes)? < precision {
777 Err(Error::ComparePrecisionAndSize {
778 precision,
779 num_bytes,
780 })
781 } else {
782 Ok(Value::Decimal(num))
783 }
784 }
786 Value::Fixed(_, bytes) | Value::Bytes(bytes) => {
787 if max_prec_for_len(bytes.len())? < precision {
788 Err(Error::ComparePrecisionAndSize {
789 precision,
790 num_bytes: bytes.len(),
791 })
792 } else {
793 Ok(Value::Decimal(Decimal::from(bytes)))
795 }
796 }
797 other => Err(Error::ResolveDecimal(other.into())),
798 }
799 }
800
801 fn resolve_date(self) -> Result<Self, Error> {
802 match self {
803 Value::Date(d) | Value::Int(d) => Ok(Value::Date(d)),
804 other => Err(Error::GetDate(other.into())),
805 }
806 }
807
808 fn resolve_time_millis(self) -> Result<Self, Error> {
809 match self {
810 Value::TimeMillis(t) | Value::Int(t) => Ok(Value::TimeMillis(t)),
811 other => Err(Error::GetTimeMillis(other.into())),
812 }
813 }
814
815 fn resolve_time_micros(self) -> Result<Self, Error> {
816 match self {
817 Value::TimeMicros(t) | Value::Long(t) => Ok(Value::TimeMicros(t)),
818 Value::Int(t) => Ok(Value::TimeMicros(i64::from(t))),
819 other => Err(Error::GetTimeMicros(other.into())),
820 }
821 }
822
823 fn resolve_timestamp_millis(self) -> Result<Self, Error> {
824 match self {
825 Value::TimestampMillis(ts) | Value::Long(ts) => Ok(Value::TimestampMillis(ts)),
826 Value::Int(ts) => Ok(Value::TimestampMillis(i64::from(ts))),
827 other => Err(Error::GetTimestampMillis(other.into())),
828 }
829 }
830
831 fn resolve_timestamp_micros(self) -> Result<Self, Error> {
832 match self {
833 Value::TimestampMicros(ts) | Value::Long(ts) => Ok(Value::TimestampMicros(ts)),
834 Value::Int(ts) => Ok(Value::TimestampMicros(i64::from(ts))),
835 other => Err(Error::GetTimestampMicros(other.into())),
836 }
837 }
838
839 fn resolve_timestamp_nanos(self) -> Result<Self, Error> {
840 match self {
841 Value::TimestampNanos(ts) | Value::Long(ts) => Ok(Value::TimestampNanos(ts)),
842 Value::Int(ts) => Ok(Value::TimestampNanos(i64::from(ts))),
843 other => Err(Error::GetTimestampNanos(other.into())),
844 }
845 }
846
847 fn resolve_local_timestamp_millis(self) -> Result<Self, Error> {
848 match self {
849 Value::LocalTimestampMillis(ts) | Value::Long(ts) => {
850 Ok(Value::LocalTimestampMillis(ts))
851 }
852 Value::Int(ts) => Ok(Value::LocalTimestampMillis(i64::from(ts))),
853 other => Err(Error::GetLocalTimestampMillis(other.into())),
854 }
855 }
856
857 fn resolve_local_timestamp_micros(self) -> Result<Self, Error> {
858 match self {
859 Value::LocalTimestampMicros(ts) | Value::Long(ts) => {
860 Ok(Value::LocalTimestampMicros(ts))
861 }
862 Value::Int(ts) => Ok(Value::LocalTimestampMicros(i64::from(ts))),
863 other => Err(Error::GetLocalTimestampMicros(other.into())),
864 }
865 }
866
867 fn resolve_local_timestamp_nanos(self) -> Result<Self, Error> {
868 match self {
869 Value::LocalTimestampNanos(ts) | Value::Long(ts) => Ok(Value::LocalTimestampNanos(ts)),
870 Value::Int(ts) => Ok(Value::LocalTimestampNanos(i64::from(ts))),
871 other => Err(Error::GetLocalTimestampNanos(other.into())),
872 }
873 }
874
875 fn resolve_null(self) -> Result<Self, Error> {
876 match self {
877 Value::Null => Ok(Value::Null),
878 other => Err(Error::GetNull(other.into())),
879 }
880 }
881
882 fn resolve_boolean(self) -> Result<Self, Error> {
883 match self {
884 Value::Boolean(b) => Ok(Value::Boolean(b)),
885 other => Err(Error::GetBoolean(other.into())),
886 }
887 }
888
889 fn resolve_int(self) -> Result<Self, Error> {
890 match self {
891 Value::Int(n) => Ok(Value::Int(n)),
892 Value::Long(n) => Ok(Value::Int(n as i32)),
893 other => Err(Error::GetInt(other.into())),
894 }
895 }
896
897 fn resolve_long(self) -> Result<Self, Error> {
898 match self {
899 Value::Int(n) => Ok(Value::Long(i64::from(n))),
900 Value::Long(n) => Ok(Value::Long(n)),
901 other => Err(Error::GetLong(other.into())),
902 }
903 }
904
905 fn resolve_float(self) -> Result<Self, Error> {
906 match self {
907 Value::Int(n) => Ok(Value::Float(n as f32)),
908 Value::Long(n) => Ok(Value::Float(n as f32)),
909 Value::Float(x) => Ok(Value::Float(x)),
910 Value::Double(x) => Ok(Value::Float(x as f32)),
911 other => Err(Error::GetFloat(other.into())),
912 }
913 }
914
915 fn resolve_double(self) -> Result<Self, Error> {
916 match self {
917 Value::Int(n) => Ok(Value::Double(f64::from(n))),
918 Value::Long(n) => Ok(Value::Double(n as f64)),
919 Value::Float(x) => Ok(Value::Double(f64::from(x))),
920 Value::Double(x) => Ok(Value::Double(x)),
921 other => Err(Error::GetDouble(other.into())),
922 }
923 }
924
925 fn resolve_bytes(self) -> Result<Self, Error> {
926 match self {
927 Value::Bytes(bytes) => Ok(Value::Bytes(bytes)),
928 Value::String(s) => Ok(Value::Bytes(s.into_bytes())),
929 Value::Array(items) => Ok(Value::Bytes(
930 items
931 .into_iter()
932 .map(Value::try_u8)
933 .collect::<Result<Vec<_>, _>>()?,
934 )),
935 other => Err(Error::GetBytes(other.into())),
936 }
937 }
938
939 fn resolve_string(self) -> Result<Self, Error> {
940 match self {
941 Value::String(s) => Ok(Value::String(s)),
942 Value::Bytes(bytes) | Value::Fixed(_, bytes) => Ok(Value::String(
943 String::from_utf8(bytes).map_err(Error::ConvertToUtf8)?,
944 )),
945 other => Err(Error::GetString(other.into())),
946 }
947 }
948
949 fn resolve_fixed(self, size: usize) -> Result<Self, Error> {
950 match self {
951 Value::Fixed(n, bytes) => {
952 if n == size {
953 Ok(Value::Fixed(n, bytes))
954 } else {
955 Err(Error::CompareFixedSizes { size, n })
956 }
957 }
958 Value::String(s) => Ok(Value::Fixed(s.len(), s.into_bytes())),
959 Value::Bytes(s) => {
960 if s.len() == size {
961 Ok(Value::Fixed(size, s))
962 } else {
963 Err(Error::CompareFixedSizes { size, n: s.len() })
964 }
965 }
966 other => Err(Error::GetStringForFixed(other.into())),
967 }
968 }
969
970 pub(crate) fn resolve_enum(
971 self,
972 symbols: &[String],
973 enum_default: &Option<String>,
974 _field_default: &Option<JsonValue>,
975 ) -> Result<Self, Error> {
976 let validate_symbol = |symbol: String, symbols: &[String]| {
977 if let Some(index) = symbols.iter().position(|item| item == &symbol) {
978 Ok(Value::Enum(index as u32, symbol))
979 } else {
980 match enum_default {
981 Some(default) => {
982 if let Some(index) = symbols.iter().position(|item| item == default) {
983 Ok(Value::Enum(index as u32, default.clone()))
984 } else {
985 Err(Error::GetEnumDefault {
986 symbol,
987 symbols: symbols.into(),
988 })
989 }
990 }
991 _ => Err(Error::GetEnumDefault {
992 symbol,
993 symbols: symbols.into(),
994 }),
995 }
996 }
997 };
998
999 match self {
1000 Value::Enum(_raw_index, s) => validate_symbol(s, symbols),
1001 Value::String(s) => validate_symbol(s, symbols),
1002 other => Err(Error::GetEnum(other.into())),
1003 }
1004 }
1005
1006 fn resolve_union<S: Borrow<Schema> + Debug>(
1007 self,
1008 schema: &UnionSchema,
1009 names: &HashMap<Name, S>,
1010 enclosing_namespace: &Namespace,
1011 field_default: &Option<JsonValue>,
1012 ) -> Result<Self, Error> {
1013 let v = match self {
1014 Value::Union(_i, v) => *v,
1016 v => v,
1018 };
1019 let (i, inner) = schema
1020 .find_schema_with_known_schemata(&v, Some(names), enclosing_namespace)
1021 .ok_or(Error::FindUnionVariant)?;
1022
1023 Ok(Value::Union(
1024 i as u32,
1025 Box::new(v.resolve_internal(inner, names, enclosing_namespace, field_default)?),
1026 ))
1027 }
1028
1029 fn resolve_array<S: Borrow<Schema> + Debug>(
1030 self,
1031 schema: &Schema,
1032 names: &HashMap<Name, S>,
1033 enclosing_namespace: &Namespace,
1034 ) -> Result<Self, Error> {
1035 match self {
1036 Value::Array(items) => Ok(Value::Array(
1037 items
1038 .into_iter()
1039 .map(|item| item.resolve_internal(schema, names, enclosing_namespace, &None))
1040 .collect::<Result<_, _>>()?,
1041 )),
1042 other => Err(Error::GetArray {
1043 expected: schema.into(),
1044 other: other.into(),
1045 }),
1046 }
1047 }
1048
1049 fn resolve_map<S: Borrow<Schema> + Debug>(
1050 self,
1051 schema: &Schema,
1052 names: &HashMap<Name, S>,
1053 enclosing_namespace: &Namespace,
1054 ) -> Result<Self, Error> {
1055 match self {
1056 Value::Map(items) => Ok(Value::Map(
1057 items
1058 .into_iter()
1059 .map(|(key, value)| {
1060 value
1061 .resolve_internal(schema, names, enclosing_namespace, &None)
1062 .map(|value| (key, value))
1063 })
1064 .collect::<Result<_, _>>()?,
1065 )),
1066 other => Err(Error::GetMap {
1067 expected: schema.into(),
1068 other: other.into(),
1069 }),
1070 }
1071 }
1072
1073 fn resolve_record<S: Borrow<Schema> + Debug>(
1074 self,
1075 fields: &[RecordField],
1076 names: &HashMap<Name, S>,
1077 enclosing_namespace: &Namespace,
1078 ) -> Result<Self, Error> {
1079 let mut items = match self {
1080 Value::Map(items) => Ok(items),
1081 Value::Record(fields) => Ok(fields.into_iter().collect::<HashMap<_, _>>()),
1082 other => Err(Error::GetRecord {
1083 expected: fields
1084 .iter()
1085 .map(|field| (field.name.clone(), field.schema.clone().into()))
1086 .collect(),
1087 other: other.into(),
1088 }),
1089 }?;
1090
1091 let new_fields = fields
1092 .iter()
1093 .map(|field| {
1094 let value = match items.remove(&field.name) {
1095 Some(value) => value,
1096 None => match field.default {
1097 Some(ref value) => match field.schema {
1098 Schema::Enum(EnumSchema {
1099 ref symbols,
1100 ref default,
1101 ..
1102 }) => Value::from(value.clone()).resolve_enum(
1103 symbols,
1104 default,
1105 &field.default.clone(),
1106 )?,
1107 Schema::Union(ref union_schema) => {
1108 let first = &union_schema.variants()[0];
1109 match first {
1112 Schema::Null => Value::Union(0, Box::new(Value::Null)),
1113 _ => Value::Union(
1114 0,
1115 Box::new(Value::from(value.clone()).resolve_internal(
1116 first,
1117 names,
1118 enclosing_namespace,
1119 &field.default,
1120 )?),
1121 ),
1122 }
1123 }
1124 _ => Value::from(value.clone()),
1125 },
1126 None => {
1127 return Err(Error::GetField(field.name.clone()));
1128 }
1129 },
1130 };
1131 value
1132 .resolve_internal(&field.schema, names, enclosing_namespace, &field.default)
1133 .map(|value| (field.name.clone(), value))
1134 })
1135 .collect::<Result<Vec<_>, _>>()?;
1136
1137 Ok(Value::Record(new_fields))
1138 }
1139
1140 fn try_u8(self) -> AvroResult<u8> {
1141 let int = self.resolve(&Schema::Int)?;
1142 if let Value::Int(n) = int {
1143 if n >= 0 && n <= i32::from(u8::MAX) {
1144 return Ok(n as u8);
1145 }
1146 }
1147
1148 Err(Error::GetU8(int.into()))
1149 }
1150}
1151
1152#[cfg(test)]
1153mod tests {
1154 use super::*;
1155 use crate::{
1156 duration::{Days, Millis, Months},
1157 schema::RecordFieldOrder,
1158 };
1159 use apache_avro_test_helper::{
1160 logger::{assert_logged, assert_not_logged},
1161 TestResult,
1162 };
1163 use num_bigint::BigInt;
1164 use pretty_assertions::assert_eq;
1165 use serde_json::json;
1166
1167 #[test]
1168 fn avro_3809_validate_nested_records_with_implicit_namespace() -> TestResult {
1169 let schema = Schema::parse_str(
1170 r#"{
1171 "name": "record_name",
1172 "namespace": "space",
1173 "type": "record",
1174 "fields": [
1175 {
1176 "name": "outer_field_1",
1177 "type": {
1178 "type": "record",
1179 "name": "middle_record_name",
1180 "namespace": "middle_namespace",
1181 "fields": [
1182 {
1183 "name": "middle_field_1",
1184 "type": {
1185 "type": "record",
1186 "name": "inner_record_name",
1187 "fields": [
1188 { "name": "inner_field_1", "type": "double" }
1189 ]
1190 }
1191 },
1192 { "name": "middle_field_2", "type": "inner_record_name" }
1193 ]
1194 }
1195 }
1196 ]
1197 }"#,
1198 )?;
1199 let value = Value::Record(vec![(
1200 "outer_field_1".into(),
1201 Value::Record(vec![
1202 (
1203 "middle_field_1".into(),
1204 Value::Record(vec![("inner_field_1".into(), Value::Double(1.2f64))]),
1205 ),
1206 (
1207 "middle_field_2".into(),
1208 Value::Record(vec![("inner_field_1".into(), Value::Double(1.6f64))]),
1209 ),
1210 ]),
1211 )]);
1212
1213 assert!(value.validate(&schema));
1214 Ok(())
1215 }
1216
1217 #[test]
1218 fn validate() -> TestResult {
1219 let value_schema_valid = vec![
1220 (Value::Int(42), Schema::Int, true, ""),
1221 (Value::Int(43), Schema::Long, true, ""),
1222 (Value::Float(43.2), Schema::Float, true, ""),
1223 (Value::Float(45.9), Schema::Double, true, ""),
1224 (
1225 Value::Int(42),
1226 Schema::Boolean,
1227 false,
1228 "Invalid value: Int(42) for schema: Boolean. Reason: Unsupported value-schema combination! Value: Int(42), schema: Boolean",
1229 ),
1230 (
1231 Value::Union(0, Box::new(Value::Null)),
1232 Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?),
1233 true,
1234 "",
1235 ),
1236 (
1237 Value::Union(1, Box::new(Value::Int(42))),
1238 Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?),
1239 true,
1240 "",
1241 ),
1242 (
1243 Value::Union(0, Box::new(Value::Null)),
1244 Schema::Union(UnionSchema::new(vec![Schema::Double, Schema::Int])?),
1245 false,
1246 "Invalid value: Union(0, Null) for schema: Union(UnionSchema { schemas: [Double, Int], variant_index: {Int: 1, Double: 0} }). Reason: Unsupported value-schema combination! Value: Null, schema: Double",
1247 ),
1248 (
1249 Value::Union(3, Box::new(Value::Int(42))),
1250 Schema::Union(
1251 UnionSchema::new(vec![
1252 Schema::Null,
1253 Schema::Double,
1254 Schema::String,
1255 Schema::Int,
1256 ])
1257 ?,
1258 ),
1259 true,
1260 "",
1261 ),
1262 (
1263 Value::Union(1, Box::new(Value::Long(42i64))),
1264 Schema::Union(
1265 UnionSchema::new(vec![Schema::Null, Schema::TimestampMillis])?,
1266 ),
1267 true,
1268 "",
1269 ),
1270 (
1271 Value::Union(2, Box::new(Value::Long(1_i64))),
1272 Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?),
1273 false,
1274 "Invalid value: Union(2, Long(1)) for schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }). Reason: No schema in the union at position '2'",
1275 ),
1276 (
1277 Value::Array(vec![Value::Long(42i64)]),
1278 Schema::array(Schema::Long),
1279 true,
1280 "",
1281 ),
1282 (
1283 Value::Array(vec![Value::Boolean(true)]),
1284 Schema::array(Schema::Long),
1285 false,
1286 "Invalid value: Array([Boolean(true)]) for schema: Array(ArraySchema { items: Long, attributes: {} }). Reason: Unsupported value-schema combination! Value: Boolean(true), schema: Long",
1287 ),
1288 (Value::Record(vec![]), Schema::Null, false, "Invalid value: Record([]) for schema: Null. Reason: Unsupported value-schema combination! Value: Record([]), schema: Null"),
1289 (
1290 Value::Fixed(12, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]),
1291 Schema::Duration,
1292 true,
1293 "",
1294 ),
1295 (
1296 Value::Fixed(11, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
1297 Schema::Duration,
1298 false,
1299 "Invalid value: Fixed(11, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) for schema: Duration. Reason: The value's size ('11') must be exactly 12 to be a Duration",
1300 ),
1301 (
1302 Value::Record(vec![("unknown_field_name".to_string(), Value::Null)]),
1303 Schema::Record(RecordSchema {
1304 name: Name::new("record_name").unwrap(),
1305 aliases: None,
1306 doc: None,
1307 fields: vec![RecordField {
1308 name: "field_name".to_string(),
1309 doc: None,
1310 default: None,
1311 aliases: None,
1312 schema: Schema::Int,
1313 order: RecordFieldOrder::Ignore,
1314 position: 0,
1315 custom_attributes: Default::default(),
1316 }],
1317 lookup: Default::default(),
1318 attributes: Default::default(),
1319 }),
1320 false,
1321 r#"Invalid value: Record([("unknown_field_name", Null)]) for schema: Record(RecordSchema { name: Name { name: "record_name", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "field_name", doc: None, aliases: None, default: None, schema: Int, order: Ignore, position: 0, custom_attributes: {} }], lookup: {}, attributes: {} }). Reason: There is no schema field for field 'unknown_field_name'"#,
1322 ),
1323 (
1324 Value::Record(vec![("field_name".to_string(), Value::Null)]),
1325 Schema::Record(RecordSchema {
1326 name: Name::new("record_name").unwrap(),
1327 aliases: None,
1328 doc: None,
1329 fields: vec![RecordField {
1330 name: "field_name".to_string(),
1331 doc: None,
1332 default: None,
1333 aliases: None,
1334 schema: Schema::Ref {
1335 name: Name::new("missing").unwrap(),
1336 },
1337 order: RecordFieldOrder::Ignore,
1338 position: 0,
1339 custom_attributes: Default::default(),
1340 }],
1341 lookup: [("field_name".to_string(), 0)].iter().cloned().collect(),
1342 attributes: Default::default(),
1343 }),
1344 false,
1345 r#"Invalid value: Record([("field_name", Null)]) for schema: Record(RecordSchema { name: Name { name: "record_name", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "field_name", doc: None, aliases: None, default: None, schema: Ref { name: Name { name: "missing", namespace: None } }, order: Ignore, position: 0, custom_attributes: {} }], lookup: {"field_name": 0}, attributes: {} }). Reason: Unresolved schema reference: 'Name { name: "missing", namespace: None }'. Parsed names: []"#,
1346 ),
1347 ];
1348
1349 for (value, schema, valid, expected_err_message) in value_schema_valid.into_iter() {
1350 let err_message =
1351 value.validate_internal::<Schema>(&schema, &HashMap::default(), &None);
1352 assert_eq!(valid, err_message.is_none());
1353 if !valid {
1354 let full_err_message = format!(
1355 "Invalid value: {:?} for schema: {:?}. Reason: {}",
1356 value,
1357 schema,
1358 err_message.unwrap()
1359 );
1360 assert_eq!(expected_err_message, full_err_message);
1361 }
1362 }
1363
1364 Ok(())
1365 }
1366
1367 #[test]
1368 fn validate_fixed() -> TestResult {
1369 let schema = Schema::Fixed(FixedSchema {
1370 size: 4,
1371 name: Name::new("some_fixed").unwrap(),
1372 aliases: None,
1373 doc: None,
1374 default: None,
1375 attributes: Default::default(),
1376 });
1377
1378 assert!(Value::Fixed(4, vec![0, 0, 0, 0]).validate(&schema));
1379 let value = Value::Fixed(5, vec![0, 0, 0, 0, 0]);
1380 assert!(!value.validate(&schema));
1381 assert_logged(
1382 format!(
1383 "Invalid value: {:?} for schema: {:?}. Reason: {}",
1384 value, schema, "The value's size (5) is different than the schema's size (4)"
1385 )
1386 .as_str(),
1387 );
1388
1389 assert!(Value::Bytes(vec![0, 0, 0, 0]).validate(&schema));
1390 let value = Value::Bytes(vec![0, 0, 0, 0, 0]);
1391 assert!(!value.validate(&schema));
1392 assert_logged(
1393 format!(
1394 "Invalid value: {:?} for schema: {:?}. Reason: {}",
1395 value, schema, "The bytes' length (5) is different than the schema's size (4)"
1396 )
1397 .as_str(),
1398 );
1399
1400 Ok(())
1401 }
1402
1403 #[test]
1404 fn validate_enum() -> TestResult {
1405 let schema = Schema::Enum(EnumSchema {
1406 name: Name::new("some_enum").unwrap(),
1407 aliases: None,
1408 doc: None,
1409 symbols: vec![
1410 "spades".to_string(),
1411 "hearts".to_string(),
1412 "diamonds".to_string(),
1413 "clubs".to_string(),
1414 ],
1415 default: None,
1416 attributes: Default::default(),
1417 });
1418
1419 assert!(Value::Enum(0, "spades".to_string()).validate(&schema));
1420 assert!(Value::String("spades".to_string()).validate(&schema));
1421
1422 let value = Value::Enum(1, "spades".to_string());
1423 assert!(!value.validate(&schema));
1424 assert_logged(
1425 format!(
1426 "Invalid value: {:?} for schema: {:?}. Reason: {}",
1427 value, schema, "Symbol 'spades' is not at position '1'"
1428 )
1429 .as_str(),
1430 );
1431
1432 let value = Value::Enum(1000, "spades".to_string());
1433 assert!(!value.validate(&schema));
1434 assert_logged(
1435 format!(
1436 "Invalid value: {:?} for schema: {:?}. Reason: {}",
1437 value, schema, "No symbol at position '1000'"
1438 )
1439 .as_str(),
1440 );
1441
1442 let value = Value::String("lorem".to_string());
1443 assert!(!value.validate(&schema));
1444 assert_logged(
1445 format!(
1446 "Invalid value: {:?} for schema: {:?}. Reason: {}",
1447 value, schema, "'lorem' is not a member of the possible symbols"
1448 )
1449 .as_str(),
1450 );
1451
1452 let other_schema = Schema::Enum(EnumSchema {
1453 name: Name::new("some_other_enum").unwrap(),
1454 aliases: None,
1455 doc: None,
1456 symbols: vec![
1457 "hearts".to_string(),
1458 "diamonds".to_string(),
1459 "clubs".to_string(),
1460 "spades".to_string(),
1461 ],
1462 default: None,
1463 attributes: Default::default(),
1464 });
1465
1466 let value = Value::Enum(0, "spades".to_string());
1467 assert!(!value.validate(&other_schema));
1468 assert_logged(
1469 format!(
1470 "Invalid value: {:?} for schema: {:?}. Reason: {}",
1471 value, other_schema, "Symbol 'spades' is not at position '0'"
1472 )
1473 .as_str(),
1474 );
1475
1476 Ok(())
1477 }
1478
1479 #[test]
1480 fn validate_record() -> TestResult {
1481 let schema = Schema::Record(RecordSchema {
1494 name: Name::new("some_record").unwrap(),
1495 aliases: None,
1496 doc: None,
1497 fields: vec![
1498 RecordField {
1499 name: "a".to_string(),
1500 doc: None,
1501 default: None,
1502 aliases: None,
1503 schema: Schema::Long,
1504 order: RecordFieldOrder::Ascending,
1505 position: 0,
1506 custom_attributes: Default::default(),
1507 },
1508 RecordField {
1509 name: "b".to_string(),
1510 doc: None,
1511 default: None,
1512 aliases: None,
1513 schema: Schema::String,
1514 order: RecordFieldOrder::Ascending,
1515 position: 1,
1516 custom_attributes: Default::default(),
1517 },
1518 RecordField {
1519 name: "c".to_string(),
1520 doc: None,
1521 default: Some(JsonValue::Null),
1522 aliases: None,
1523 schema: Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?),
1524 order: RecordFieldOrder::Ascending,
1525 position: 2,
1526 custom_attributes: Default::default(),
1527 },
1528 ],
1529 lookup: [
1530 ("a".to_string(), 0),
1531 ("b".to_string(), 1),
1532 ("c".to_string(), 2),
1533 ]
1534 .iter()
1535 .cloned()
1536 .collect(),
1537 attributes: Default::default(),
1538 });
1539
1540 assert!(Value::Record(vec![
1541 ("a".to_string(), Value::Long(42i64)),
1542 ("b".to_string(), Value::String("foo".to_string())),
1543 ])
1544 .validate(&schema));
1545
1546 let value = Value::Record(vec![
1547 ("b".to_string(), Value::String("foo".to_string())),
1548 ("a".to_string(), Value::Long(42i64)),
1549 ]);
1550 assert!(value.validate(&schema));
1551
1552 let value = Value::Record(vec![
1553 ("a".to_string(), Value::Boolean(false)),
1554 ("b".to_string(), Value::String("foo".to_string())),
1555 ]);
1556 assert!(!value.validate(&schema));
1557 assert_logged(
1558 r#"Invalid value: Record([("a", Boolean(false)), ("b", String("foo"))]) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: Unsupported value-schema combination! Value: Boolean(false), schema: Long"#,
1559 );
1560
1561 let value = Value::Record(vec![
1562 ("a".to_string(), Value::Long(42i64)),
1563 ("c".to_string(), Value::String("foo".to_string())),
1564 ]);
1565 assert!(!value.validate(&schema));
1566 assert_logged(
1567 r#"Invalid value: Record([("a", Long(42)), ("c", String("foo"))]) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: Could not find matching type in union"#,
1568 );
1569 assert_not_logged(
1570 r#"Invalid value: String("foo") for schema: Int. Reason: Unsupported value-schema combination"#,
1571 );
1572
1573 let value = Value::Record(vec![
1574 ("a".to_string(), Value::Long(42i64)),
1575 ("d".to_string(), Value::String("foo".to_string())),
1576 ]);
1577 assert!(!value.validate(&schema));
1578 assert_logged(
1579 r#"Invalid value: Record([("a", Long(42)), ("d", String("foo"))]) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: There is no schema field for field 'd'"#,
1580 );
1581
1582 let value = Value::Record(vec![
1583 ("a".to_string(), Value::Long(42i64)),
1584 ("b".to_string(), Value::String("foo".to_string())),
1585 ("c".to_string(), Value::Null),
1586 ("d".to_string(), Value::Null),
1587 ]);
1588 assert!(!value.validate(&schema));
1589 assert_logged(
1590 r#"Invalid value: Record([("a", Long(42)), ("b", String("foo")), ("c", Null), ("d", Null)]) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: The value's records length (4) is greater than the schema's (3 fields)"#,
1591 );
1592
1593 assert!(Value::Map(
1594 vec![
1595 ("a".to_string(), Value::Long(42i64)),
1596 ("b".to_string(), Value::String("foo".to_string())),
1597 ]
1598 .into_iter()
1599 .collect()
1600 )
1601 .validate(&schema));
1602
1603 assert!(!Value::Map(
1604 vec![("d".to_string(), Value::Long(123_i64)),]
1605 .into_iter()
1606 .collect()
1607 )
1608 .validate(&schema));
1609 assert_logged(
1610 r#"Invalid value: Map({"d": Long(123)}) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: Field with name '"a"' is not a member of the map items
1611Field with name '"b"' is not a member of the map items"#,
1612 );
1613
1614 let union_schema = Schema::Union(UnionSchema::new(vec![Schema::Null, schema])?);
1615
1616 assert!(Value::Union(
1617 1,
1618 Box::new(Value::Record(vec![
1619 ("a".to_string(), Value::Long(42i64)),
1620 ("b".to_string(), Value::String("foo".to_string())),
1621 ]))
1622 )
1623 .validate(&union_schema));
1624
1625 assert!(Value::Union(
1626 1,
1627 Box::new(Value::Map(
1628 vec![
1629 ("a".to_string(), Value::Long(42i64)),
1630 ("b".to_string(), Value::String("foo".to_string())),
1631 ]
1632 .into_iter()
1633 .collect()
1634 ))
1635 )
1636 .validate(&union_schema));
1637
1638 Ok(())
1639 }
1640
1641 #[test]
1642 fn resolve_bytes_ok() -> TestResult {
1643 let value = Value::Array(vec![Value::Int(0), Value::Int(42)]);
1644 assert_eq!(
1645 value.resolve(&Schema::Bytes)?,
1646 Value::Bytes(vec![0u8, 42u8])
1647 );
1648
1649 Ok(())
1650 }
1651
1652 #[test]
1653 fn resolve_string_from_bytes() -> TestResult {
1654 let value = Value::Bytes(vec![97, 98, 99]);
1655 assert_eq!(
1656 value.resolve(&Schema::String)?,
1657 Value::String("abc".to_string())
1658 );
1659
1660 Ok(())
1661 }
1662
1663 #[test]
1664 fn resolve_string_from_fixed() -> TestResult {
1665 let value = Value::Fixed(3, vec![97, 98, 99]);
1666 assert_eq!(
1667 value.resolve(&Schema::String)?,
1668 Value::String("abc".to_string())
1669 );
1670
1671 Ok(())
1672 }
1673
1674 #[test]
1675 fn resolve_bytes_failure() {
1676 let value = Value::Array(vec![Value::Int(2000), Value::Int(-42)]);
1677 assert!(value.resolve(&Schema::Bytes).is_err());
1678 }
1679
1680 #[test]
1681 fn resolve_decimal_bytes() -> TestResult {
1682 let value = Value::Decimal(Decimal::from(vec![1, 2, 3, 4, 5]));
1683 value.clone().resolve(&Schema::Decimal(DecimalSchema {
1684 precision: 10,
1685 scale: 4,
1686 inner: Box::new(Schema::Bytes),
1687 }))?;
1688 assert!(value.resolve(&Schema::String).is_err());
1689
1690 Ok(())
1691 }
1692
1693 #[test]
1694 fn resolve_decimal_invalid_scale() {
1695 let value = Value::Decimal(Decimal::from(vec![1, 2]));
1696 assert!(value
1697 .resolve(&Schema::Decimal(DecimalSchema {
1698 precision: 2,
1699 scale: 3,
1700 inner: Box::new(Schema::Bytes),
1701 }))
1702 .is_err());
1703 }
1704
1705 #[test]
1706 fn resolve_decimal_invalid_precision_for_length() {
1707 let value = Value::Decimal(Decimal::from((1u8..=8u8).rev().collect::<Vec<_>>()));
1708 assert!(value
1709 .resolve(&Schema::Decimal(DecimalSchema {
1710 precision: 1,
1711 scale: 0,
1712 inner: Box::new(Schema::Bytes),
1713 }))
1714 .is_ok());
1715 }
1716
1717 #[test]
1718 fn resolve_decimal_fixed() {
1719 let value = Value::Decimal(Decimal::from(vec![1, 2, 3, 4, 5]));
1720 assert!(value
1721 .clone()
1722 .resolve(&Schema::Decimal(DecimalSchema {
1723 precision: 10,
1724 scale: 1,
1725 inner: Box::new(Schema::Fixed(FixedSchema {
1726 name: Name::new("decimal").unwrap(),
1727 aliases: None,
1728 size: 20,
1729 doc: None,
1730 default: None,
1731 attributes: Default::default(),
1732 }))
1733 }))
1734 .is_ok());
1735 assert!(value.resolve(&Schema::String).is_err());
1736 }
1737
1738 #[test]
1739 fn resolve_date() {
1740 let value = Value::Date(2345);
1741 assert!(value.clone().resolve(&Schema::Date).is_ok());
1742 assert!(value.resolve(&Schema::String).is_err());
1743 }
1744
1745 #[test]
1746 fn resolve_time_millis() {
1747 let value = Value::TimeMillis(10);
1748 assert!(value.clone().resolve(&Schema::TimeMillis).is_ok());
1749 assert!(value.resolve(&Schema::TimeMicros).is_err());
1750 }
1751
1752 #[test]
1753 fn resolve_time_micros() {
1754 let value = Value::TimeMicros(10);
1755 assert!(value.clone().resolve(&Schema::TimeMicros).is_ok());
1756 assert!(value.resolve(&Schema::TimeMillis).is_err());
1757 }
1758
1759 #[test]
1760 fn resolve_timestamp_millis() {
1761 let value = Value::TimestampMillis(10);
1762 assert!(value.clone().resolve(&Schema::TimestampMillis).is_ok());
1763 assert!(value.resolve(&Schema::Float).is_err());
1764
1765 let value = Value::Float(10.0f32);
1766 assert!(value.resolve(&Schema::TimestampMillis).is_err());
1767 }
1768
1769 #[test]
1770 fn resolve_timestamp_micros() {
1771 let value = Value::TimestampMicros(10);
1772 assert!(value.clone().resolve(&Schema::TimestampMicros).is_ok());
1773 assert!(value.resolve(&Schema::Int).is_err());
1774
1775 let value = Value::Double(10.0);
1776 assert!(value.resolve(&Schema::TimestampMicros).is_err());
1777 }
1778
1779 #[test]
1780 fn test_avro_3914_resolve_timestamp_nanos() {
1781 let value = Value::TimestampNanos(10);
1782 assert!(value.clone().resolve(&Schema::TimestampNanos).is_ok());
1783 assert!(value.resolve(&Schema::Int).is_err());
1784
1785 let value = Value::Double(10.0);
1786 assert!(value.resolve(&Schema::TimestampNanos).is_err());
1787 }
1788
1789 #[test]
1790 fn test_avro_3853_resolve_timestamp_millis() {
1791 let value = Value::LocalTimestampMillis(10);
1792 assert!(value.clone().resolve(&Schema::LocalTimestampMillis).is_ok());
1793 assert!(value.resolve(&Schema::Float).is_err());
1794
1795 let value = Value::Float(10.0f32);
1796 assert!(value.resolve(&Schema::LocalTimestampMillis).is_err());
1797 }
1798
1799 #[test]
1800 fn test_avro_3853_resolve_timestamp_micros() {
1801 let value = Value::LocalTimestampMicros(10);
1802 assert!(value.clone().resolve(&Schema::LocalTimestampMicros).is_ok());
1803 assert!(value.resolve(&Schema::Int).is_err());
1804
1805 let value = Value::Double(10.0);
1806 assert!(value.resolve(&Schema::LocalTimestampMicros).is_err());
1807 }
1808
1809 #[test]
1810 fn test_avro_3916_resolve_timestamp_nanos() {
1811 let value = Value::LocalTimestampNanos(10);
1812 assert!(value.clone().resolve(&Schema::LocalTimestampNanos).is_ok());
1813 assert!(value.resolve(&Schema::Int).is_err());
1814
1815 let value = Value::Double(10.0);
1816 assert!(value.resolve(&Schema::LocalTimestampNanos).is_err());
1817 }
1818
1819 #[test]
1820 fn resolve_duration() {
1821 let value = Value::Duration(Duration::new(
1822 Months::new(10),
1823 Days::new(5),
1824 Millis::new(3000),
1825 ));
1826 assert!(value.clone().resolve(&Schema::Duration).is_ok());
1827 assert!(value.resolve(&Schema::TimestampMicros).is_err());
1828 assert!(Value::Long(1i64).resolve(&Schema::Duration).is_err());
1829 }
1830
1831 #[test]
1832 fn resolve_uuid() -> TestResult {
1833 let value = Value::Uuid(Uuid::parse_str("1481531d-ccc9-46d9-a56f-5b67459c0537")?);
1834 assert!(value.clone().resolve(&Schema::Uuid).is_ok());
1835 assert!(value.resolve(&Schema::TimestampMicros).is_err());
1836
1837 Ok(())
1838 }
1839
1840 #[test]
1841 fn avro_3678_resolve_float_to_double() {
1842 let value = Value::Float(2345.1);
1843 assert!(value.resolve(&Schema::Double).is_ok());
1844 }
1845
1846 #[test]
1847 fn test_avro_3621_resolve_to_nullable_union() -> TestResult {
1848 let schema = Schema::parse_str(
1849 r#"{
1850 "type": "record",
1851 "name": "root",
1852 "fields": [
1853 {
1854 "name": "event",
1855 "type": [
1856 "null",
1857 {
1858 "type": "record",
1859 "name": "event",
1860 "fields": [
1861 {
1862 "name": "amount",
1863 "type": "int"
1864 },
1865 {
1866 "name": "size",
1867 "type": [
1868 "null",
1869 "int"
1870 ],
1871 "default": null
1872 }
1873 ]
1874 }
1875 ],
1876 "default": null
1877 }
1878 ]
1879 }"#,
1880 )?;
1881
1882 let value = Value::Record(vec![(
1883 "event".to_string(),
1884 Value::Record(vec![("amount".to_string(), Value::Int(200))]),
1885 )]);
1886 assert!(value.resolve(&schema).is_ok());
1887
1888 let value = Value::Record(vec![(
1889 "event".to_string(),
1890 Value::Record(vec![("size".to_string(), Value::Int(1))]),
1891 )]);
1892 assert!(value.resolve(&schema).is_err());
1893
1894 Ok(())
1895 }
1896
1897 #[test]
1898 fn json_from_avro() -> TestResult {
1899 assert_eq!(JsonValue::try_from(Value::Null)?, JsonValue::Null);
1900 assert_eq!(
1901 JsonValue::try_from(Value::Boolean(true))?,
1902 JsonValue::Bool(true)
1903 );
1904 assert_eq!(
1905 JsonValue::try_from(Value::Int(1))?,
1906 JsonValue::Number(1.into())
1907 );
1908 assert_eq!(
1909 JsonValue::try_from(Value::Long(1))?,
1910 JsonValue::Number(1.into())
1911 );
1912 assert_eq!(
1913 JsonValue::try_from(Value::Float(1.0))?,
1914 JsonValue::Number(Number::from_f64(1.0).unwrap())
1915 );
1916 assert_eq!(
1917 JsonValue::try_from(Value::Double(1.0))?,
1918 JsonValue::Number(Number::from_f64(1.0).unwrap())
1919 );
1920 assert_eq!(
1921 JsonValue::try_from(Value::Bytes(vec![1, 2, 3]))?,
1922 JsonValue::Array(vec![
1923 JsonValue::Number(1.into()),
1924 JsonValue::Number(2.into()),
1925 JsonValue::Number(3.into())
1926 ])
1927 );
1928 assert_eq!(
1929 JsonValue::try_from(Value::String("test".into()))?,
1930 JsonValue::String("test".into())
1931 );
1932 assert_eq!(
1933 JsonValue::try_from(Value::Fixed(3, vec![1, 2, 3]))?,
1934 JsonValue::Array(vec![
1935 JsonValue::Number(1.into()),
1936 JsonValue::Number(2.into()),
1937 JsonValue::Number(3.into())
1938 ])
1939 );
1940 assert_eq!(
1941 JsonValue::try_from(Value::Enum(1, "test_enum".into()))?,
1942 JsonValue::String("test_enum".into())
1943 );
1944 assert_eq!(
1945 JsonValue::try_from(Value::Union(1, Box::new(Value::String("test_enum".into()))))?,
1946 JsonValue::String("test_enum".into())
1947 );
1948 assert_eq!(
1949 JsonValue::try_from(Value::Array(vec![
1950 Value::Int(1),
1951 Value::Int(2),
1952 Value::Int(3)
1953 ]))?,
1954 JsonValue::Array(vec![
1955 JsonValue::Number(1.into()),
1956 JsonValue::Number(2.into()),
1957 JsonValue::Number(3.into())
1958 ])
1959 );
1960 assert_eq!(
1961 JsonValue::try_from(Value::Map(
1962 vec![
1963 ("v1".to_string(), Value::Int(1)),
1964 ("v2".to_string(), Value::Int(2)),
1965 ("v3".to_string(), Value::Int(3))
1966 ]
1967 .into_iter()
1968 .collect()
1969 ))?,
1970 JsonValue::Object(
1971 vec![
1972 ("v1".to_string(), JsonValue::Number(1.into())),
1973 ("v2".to_string(), JsonValue::Number(2.into())),
1974 ("v3".to_string(), JsonValue::Number(3.into()))
1975 ]
1976 .into_iter()
1977 .collect()
1978 )
1979 );
1980 assert_eq!(
1981 JsonValue::try_from(Value::Record(vec![
1982 ("v1".to_string(), Value::Int(1)),
1983 ("v2".to_string(), Value::Int(2)),
1984 ("v3".to_string(), Value::Int(3))
1985 ]))?,
1986 JsonValue::Object(
1987 vec![
1988 ("v1".to_string(), JsonValue::Number(1.into())),
1989 ("v2".to_string(), JsonValue::Number(2.into())),
1990 ("v3".to_string(), JsonValue::Number(3.into()))
1991 ]
1992 .into_iter()
1993 .collect()
1994 )
1995 );
1996 assert_eq!(
1997 JsonValue::try_from(Value::Date(1))?,
1998 JsonValue::Number(1.into())
1999 );
2000 assert_eq!(
2001 JsonValue::try_from(Value::Decimal(vec![1, 2, 3].into()))?,
2002 JsonValue::Array(vec![
2003 JsonValue::Number(1.into()),
2004 JsonValue::Number(2.into()),
2005 JsonValue::Number(3.into())
2006 ])
2007 );
2008 assert_eq!(
2009 JsonValue::try_from(Value::TimeMillis(1))?,
2010 JsonValue::Number(1.into())
2011 );
2012 assert_eq!(
2013 JsonValue::try_from(Value::TimeMicros(1))?,
2014 JsonValue::Number(1.into())
2015 );
2016 assert_eq!(
2017 JsonValue::try_from(Value::TimestampMillis(1))?,
2018 JsonValue::Number(1.into())
2019 );
2020 assert_eq!(
2021 JsonValue::try_from(Value::TimestampMicros(1))?,
2022 JsonValue::Number(1.into())
2023 );
2024 assert_eq!(
2025 JsonValue::try_from(Value::TimestampNanos(1))?,
2026 JsonValue::Number(1.into())
2027 );
2028 assert_eq!(
2029 JsonValue::try_from(Value::LocalTimestampMillis(1))?,
2030 JsonValue::Number(1.into())
2031 );
2032 assert_eq!(
2033 JsonValue::try_from(Value::LocalTimestampMicros(1))?,
2034 JsonValue::Number(1.into())
2035 );
2036 assert_eq!(
2037 JsonValue::try_from(Value::LocalTimestampNanos(1))?,
2038 JsonValue::Number(1.into())
2039 );
2040 assert_eq!(
2041 JsonValue::try_from(Value::Duration(
2042 [1u8, 2u8, 3u8, 4u8, 5u8, 6u8, 7u8, 8u8, 9u8, 10u8, 11u8, 12u8].into()
2043 ))?,
2044 JsonValue::Array(vec![
2045 JsonValue::Number(1.into()),
2046 JsonValue::Number(2.into()),
2047 JsonValue::Number(3.into()),
2048 JsonValue::Number(4.into()),
2049 JsonValue::Number(5.into()),
2050 JsonValue::Number(6.into()),
2051 JsonValue::Number(7.into()),
2052 JsonValue::Number(8.into()),
2053 JsonValue::Number(9.into()),
2054 JsonValue::Number(10.into()),
2055 JsonValue::Number(11.into()),
2056 JsonValue::Number(12.into()),
2057 ])
2058 );
2059 assert_eq!(
2060 JsonValue::try_from(Value::Uuid(Uuid::parse_str(
2061 "936DA01F-9ABD-4D9D-80C7-02AF85C822A8"
2062 )?))?,
2063 JsonValue::String("936da01f-9abd-4d9d-80c7-02af85c822a8".into())
2064 );
2065
2066 Ok(())
2067 }
2068
2069 #[test]
2070 fn test_avro_3433_recursive_resolves_record() -> TestResult {
2071 let schema = Schema::parse_str(
2072 r#"
2073 {
2074 "type":"record",
2075 "name":"TestStruct",
2076 "fields": [
2077 {
2078 "name":"a",
2079 "type":{
2080 "type":"record",
2081 "name": "Inner",
2082 "fields": [ {
2083 "name":"z",
2084 "type":"int"
2085 }]
2086 }
2087 },
2088 {
2089 "name":"b",
2090 "type":"Inner"
2091 }
2092 ]
2093 }"#,
2094 )?;
2095
2096 let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2097 let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2098 let outer = Value::Record(vec![("a".into(), inner_value1), ("b".into(), inner_value2)]);
2099 outer
2100 .resolve(&schema)
2101 .expect("Record definition defined in one field must be available in other field");
2102
2103 Ok(())
2104 }
2105
2106 #[test]
2107 fn test_avro_3433_recursive_resolves_array() -> TestResult {
2108 let schema = Schema::parse_str(
2109 r#"
2110 {
2111 "type":"record",
2112 "name":"TestStruct",
2113 "fields": [
2114 {
2115 "name":"a",
2116 "type":{
2117 "type":"array",
2118 "items": {
2119 "type":"record",
2120 "name": "Inner",
2121 "fields": [ {
2122 "name":"z",
2123 "type":"int"
2124 }]
2125 }
2126 }
2127 },
2128 {
2129 "name":"b",
2130 "type": {
2131 "type":"map",
2132 "values":"Inner"
2133 }
2134 }
2135 ]
2136 }"#,
2137 )?;
2138
2139 let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2140 let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2141 let outer_value = Value::Record(vec![
2142 ("a".into(), Value::Array(vec![inner_value1])),
2143 (
2144 "b".into(),
2145 Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
2146 ),
2147 ]);
2148 outer_value
2149 .resolve(&schema)
2150 .expect("Record defined in array definition must be resolvable from map");
2151
2152 Ok(())
2153 }
2154
2155 #[test]
2156 fn test_avro_3433_recursive_resolves_map() -> TestResult {
2157 let schema = Schema::parse_str(
2158 r#"
2159 {
2160 "type":"record",
2161 "name":"TestStruct",
2162 "fields": [
2163 {
2164 "name":"a",
2165 "type":{
2166 "type":"record",
2167 "name": "Inner",
2168 "fields": [ {
2169 "name":"z",
2170 "type":"int"
2171 }]
2172 }
2173 },
2174 {
2175 "name":"b",
2176 "type": {
2177 "type":"map",
2178 "values":"Inner"
2179 }
2180 }
2181 ]
2182 }"#,
2183 )?;
2184
2185 let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2186 let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2187 let outer_value = Value::Record(vec![
2188 ("a".into(), inner_value1),
2189 (
2190 "b".into(),
2191 Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
2192 ),
2193 ]);
2194 outer_value
2195 .resolve(&schema)
2196 .expect("Record defined in record field must be resolvable from map field");
2197
2198 Ok(())
2199 }
2200
2201 #[test]
2202 fn test_avro_3433_recursive_resolves_record_wrapper() -> TestResult {
2203 let schema = Schema::parse_str(
2204 r#"
2205 {
2206 "type":"record",
2207 "name":"TestStruct",
2208 "fields": [
2209 {
2210 "name":"a",
2211 "type":{
2212 "type":"record",
2213 "name": "Inner",
2214 "fields": [ {
2215 "name":"z",
2216 "type":"int"
2217 }]
2218 }
2219 },
2220 {
2221 "name":"b",
2222 "type": {
2223 "type":"record",
2224 "name": "InnerWrapper",
2225 "fields": [ {
2226 "name":"j",
2227 "type":"Inner"
2228 }]
2229 }
2230 }
2231 ]
2232 }"#,
2233 )?;
2234
2235 let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2236 let inner_value2 = Value::Record(vec![(
2237 "j".into(),
2238 Value::Record(vec![("z".into(), Value::Int(6))]),
2239 )]);
2240 let outer_value =
2241 Value::Record(vec![("a".into(), inner_value1), ("b".into(), inner_value2)]);
2242 outer_value.resolve(&schema).expect("Record schema defined in field must be resolvable in Record schema defined in other field");
2243
2244 Ok(())
2245 }
2246
2247 #[test]
2248 fn test_avro_3433_recursive_resolves_map_and_array() -> TestResult {
2249 let schema = Schema::parse_str(
2250 r#"
2251 {
2252 "type":"record",
2253 "name":"TestStruct",
2254 "fields": [
2255 {
2256 "name":"a",
2257 "type":{
2258 "type":"map",
2259 "values": {
2260 "type":"record",
2261 "name": "Inner",
2262 "fields": [ {
2263 "name":"z",
2264 "type":"int"
2265 }]
2266 }
2267 }
2268 },
2269 {
2270 "name":"b",
2271 "type": {
2272 "type":"array",
2273 "items":"Inner"
2274 }
2275 }
2276 ]
2277 }"#,
2278 )?;
2279
2280 let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2281 let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2282 let outer_value = Value::Record(vec![
2283 (
2284 "a".into(),
2285 Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
2286 ),
2287 ("b".into(), Value::Array(vec![inner_value1])),
2288 ]);
2289 outer_value
2290 .resolve(&schema)
2291 .expect("Record defined in map definition must be resolvable from array");
2292
2293 Ok(())
2294 }
2295
2296 #[test]
2297 fn test_avro_3433_recursive_resolves_union() -> TestResult {
2298 let schema = Schema::parse_str(
2299 r#"
2300 {
2301 "type":"record",
2302 "name":"TestStruct",
2303 "fields": [
2304 {
2305 "name":"a",
2306 "type":["null", {
2307 "type":"record",
2308 "name": "Inner",
2309 "fields": [ {
2310 "name":"z",
2311 "type":"int"
2312 }]
2313 }]
2314 },
2315 {
2316 "name":"b",
2317 "type":"Inner"
2318 }
2319 ]
2320 }"#,
2321 )?;
2322
2323 let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2324 let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2325 let outer1 = Value::Record(vec![
2326 ("a".into(), inner_value1),
2327 ("b".into(), inner_value2.clone()),
2328 ]);
2329 outer1
2330 .resolve(&schema)
2331 .expect("Record definition defined in union must be resolved in other field");
2332 let outer2 = Value::Record(vec![("a".into(), Value::Null), ("b".into(), inner_value2)]);
2333 outer2
2334 .resolve(&schema)
2335 .expect("Record definition defined in union must be resolved in other field");
2336
2337 Ok(())
2338 }
2339
2340 #[test]
2341 fn test_avro_3461_test_multi_level_resolve_outer_namespace() -> TestResult {
2342 let schema = r#"
2343 {
2344 "name": "record_name",
2345 "namespace": "space",
2346 "type": "record",
2347 "fields": [
2348 {
2349 "name": "outer_field_1",
2350 "type": [
2351 "null",
2352 {
2353 "type": "record",
2354 "name": "middle_record_name",
2355 "fields":[
2356 {
2357 "name":"middle_field_1",
2358 "type":[
2359 "null",
2360 {
2361 "type":"record",
2362 "name":"inner_record_name",
2363 "fields":[
2364 {
2365 "name":"inner_field_1",
2366 "type":"double"
2367 }
2368 ]
2369 }
2370 ]
2371 }
2372 ]
2373 }
2374 ]
2375 },
2376 {
2377 "name": "outer_field_2",
2378 "type" : "space.inner_record_name"
2379 }
2380 ]
2381 }
2382 "#;
2383 let schema = Schema::parse_str(schema)?;
2384 let inner_record = Value::Record(vec![("inner_field_1".into(), Value::Double(5.4))]);
2385 let middle_record_variation_1 = Value::Record(vec![(
2386 "middle_field_1".into(),
2387 Value::Union(0, Box::new(Value::Null)),
2388 )]);
2389 let middle_record_variation_2 = Value::Record(vec![(
2390 "middle_field_1".into(),
2391 Value::Union(1, Box::new(inner_record.clone())),
2392 )]);
2393 let outer_record_variation_1 = Value::Record(vec![
2394 (
2395 "outer_field_1".into(),
2396 Value::Union(0, Box::new(Value::Null)),
2397 ),
2398 ("outer_field_2".into(), inner_record.clone()),
2399 ]);
2400 let outer_record_variation_2 = Value::Record(vec![
2401 (
2402 "outer_field_1".into(),
2403 Value::Union(1, Box::new(middle_record_variation_1)),
2404 ),
2405 ("outer_field_2".into(), inner_record.clone()),
2406 ]);
2407 let outer_record_variation_3 = Value::Record(vec![
2408 (
2409 "outer_field_1".into(),
2410 Value::Union(1, Box::new(middle_record_variation_2)),
2411 ),
2412 ("outer_field_2".into(), inner_record),
2413 ]);
2414
2415 outer_record_variation_1
2416 .resolve(&schema)
2417 .expect("Should be able to resolve value to the schema that is it's definition");
2418 outer_record_variation_2
2419 .resolve(&schema)
2420 .expect("Should be able to resolve value to the schema that is it's definition");
2421 outer_record_variation_3
2422 .resolve(&schema)
2423 .expect("Should be able to resolve value to the schema that is it's definition");
2424
2425 Ok(())
2426 }
2427
2428 #[test]
2429 fn test_avro_3461_test_multi_level_resolve_middle_namespace() -> TestResult {
2430 let schema = r#"
2431 {
2432 "name": "record_name",
2433 "namespace": "space",
2434 "type": "record",
2435 "fields": [
2436 {
2437 "name": "outer_field_1",
2438 "type": [
2439 "null",
2440 {
2441 "type": "record",
2442 "name": "middle_record_name",
2443 "namespace":"middle_namespace",
2444 "fields":[
2445 {
2446 "name":"middle_field_1",
2447 "type":[
2448 "null",
2449 {
2450 "type":"record",
2451 "name":"inner_record_name",
2452 "fields":[
2453 {
2454 "name":"inner_field_1",
2455 "type":"double"
2456 }
2457 ]
2458 }
2459 ]
2460 }
2461 ]
2462 }
2463 ]
2464 },
2465 {
2466 "name": "outer_field_2",
2467 "type" : "middle_namespace.inner_record_name"
2468 }
2469 ]
2470 }
2471 "#;
2472 let schema = Schema::parse_str(schema)?;
2473 let inner_record = Value::Record(vec![("inner_field_1".into(), Value::Double(5.4))]);
2474 let middle_record_variation_1 = Value::Record(vec![(
2475 "middle_field_1".into(),
2476 Value::Union(0, Box::new(Value::Null)),
2477 )]);
2478 let middle_record_variation_2 = Value::Record(vec![(
2479 "middle_field_1".into(),
2480 Value::Union(1, Box::new(inner_record.clone())),
2481 )]);
2482 let outer_record_variation_1 = Value::Record(vec![
2483 (
2484 "outer_field_1".into(),
2485 Value::Union(0, Box::new(Value::Null)),
2486 ),
2487 ("outer_field_2".into(), inner_record.clone()),
2488 ]);
2489 let outer_record_variation_2 = Value::Record(vec![
2490 (
2491 "outer_field_1".into(),
2492 Value::Union(1, Box::new(middle_record_variation_1)),
2493 ),
2494 ("outer_field_2".into(), inner_record.clone()),
2495 ]);
2496 let outer_record_variation_3 = Value::Record(vec![
2497 (
2498 "outer_field_1".into(),
2499 Value::Union(1, Box::new(middle_record_variation_2)),
2500 ),
2501 ("outer_field_2".into(), inner_record),
2502 ]);
2503
2504 outer_record_variation_1
2505 .resolve(&schema)
2506 .expect("Should be able to resolve value to the schema that is it's definition");
2507 outer_record_variation_2
2508 .resolve(&schema)
2509 .expect("Should be able to resolve value to the schema that is it's definition");
2510 outer_record_variation_3
2511 .resolve(&schema)
2512 .expect("Should be able to resolve value to the schema that is it's definition");
2513
2514 Ok(())
2515 }
2516
2517 #[test]
2518 fn test_avro_3461_test_multi_level_resolve_inner_namespace() -> TestResult {
2519 let schema = r#"
2520 {
2521 "name": "record_name",
2522 "namespace": "space",
2523 "type": "record",
2524 "fields": [
2525 {
2526 "name": "outer_field_1",
2527 "type": [
2528 "null",
2529 {
2530 "type": "record",
2531 "name": "middle_record_name",
2532 "namespace":"middle_namespace",
2533 "fields":[
2534 {
2535 "name":"middle_field_1",
2536 "type":[
2537 "null",
2538 {
2539 "type":"record",
2540 "name":"inner_record_name",
2541 "namespace":"inner_namespace",
2542 "fields":[
2543 {
2544 "name":"inner_field_1",
2545 "type":"double"
2546 }
2547 ]
2548 }
2549 ]
2550 }
2551 ]
2552 }
2553 ]
2554 },
2555 {
2556 "name": "outer_field_2",
2557 "type" : "inner_namespace.inner_record_name"
2558 }
2559 ]
2560 }
2561 "#;
2562 let schema = Schema::parse_str(schema)?;
2563
2564 let inner_record = Value::Record(vec![("inner_field_1".into(), Value::Double(5.4))]);
2565 let middle_record_variation_1 = Value::Record(vec![(
2566 "middle_field_1".into(),
2567 Value::Union(0, Box::new(Value::Null)),
2568 )]);
2569 let middle_record_variation_2 = Value::Record(vec![(
2570 "middle_field_1".into(),
2571 Value::Union(1, Box::new(inner_record.clone())),
2572 )]);
2573 let outer_record_variation_1 = Value::Record(vec![
2574 (
2575 "outer_field_1".into(),
2576 Value::Union(0, Box::new(Value::Null)),
2577 ),
2578 ("outer_field_2".into(), inner_record.clone()),
2579 ]);
2580 let outer_record_variation_2 = Value::Record(vec![
2581 (
2582 "outer_field_1".into(),
2583 Value::Union(1, Box::new(middle_record_variation_1)),
2584 ),
2585 ("outer_field_2".into(), inner_record.clone()),
2586 ]);
2587 let outer_record_variation_3 = Value::Record(vec![
2588 (
2589 "outer_field_1".into(),
2590 Value::Union(1, Box::new(middle_record_variation_2)),
2591 ),
2592 ("outer_field_2".into(), inner_record),
2593 ]);
2594
2595 outer_record_variation_1
2596 .resolve(&schema)
2597 .expect("Should be able to resolve value to the schema that is it's definition");
2598 outer_record_variation_2
2599 .resolve(&schema)
2600 .expect("Should be able to resolve value to the schema that is it's definition");
2601 outer_record_variation_3
2602 .resolve(&schema)
2603 .expect("Should be able to resolve value to the schema that is it's definition");
2604
2605 Ok(())
2606 }
2607
2608 #[test]
2609 fn test_avro_3460_validation_with_refs() -> TestResult {
2610 let schema = Schema::parse_str(
2611 r#"
2612 {
2613 "type":"record",
2614 "name":"TestStruct",
2615 "fields": [
2616 {
2617 "name":"a",
2618 "type":{
2619 "type":"record",
2620 "name": "Inner",
2621 "fields": [ {
2622 "name":"z",
2623 "type":"int"
2624 }]
2625 }
2626 },
2627 {
2628 "name":"b",
2629 "type":"Inner"
2630 }
2631 ]
2632 }"#,
2633 )?;
2634
2635 let inner_value_right = Value::Record(vec![("z".into(), Value::Int(3))]);
2636 let inner_value_wrong1 = Value::Record(vec![("z".into(), Value::Null)]);
2637 let inner_value_wrong2 = Value::Record(vec![("a".into(), Value::String("testing".into()))]);
2638 let outer1 = Value::Record(vec![
2639 ("a".into(), inner_value_right.clone()),
2640 ("b".into(), inner_value_wrong1),
2641 ]);
2642
2643 let outer2 = Value::Record(vec![
2644 ("a".into(), inner_value_right),
2645 ("b".into(), inner_value_wrong2),
2646 ]);
2647
2648 assert!(
2649 !outer1.validate(&schema),
2650 "field b record is invalid against the schema"
2651 ); assert!(
2653 !outer2.validate(&schema),
2654 "field b record is invalid against the schema"
2655 ); Ok(())
2658 }
2659
2660 #[test]
2661 fn test_avro_3460_validation_with_refs_real_struct() -> TestResult {
2662 use crate::ser::Serializer;
2663 use serde::Serialize;
2664
2665 #[derive(Serialize, Clone)]
2666 struct TestInner {
2667 z: i32,
2668 }
2669
2670 #[derive(Serialize)]
2671 struct TestRefSchemaStruct1 {
2672 a: TestInner,
2673 b: String, }
2675
2676 #[derive(Serialize)]
2677 struct TestRefSchemaStruct2 {
2678 a: TestInner,
2679 b: i32, }
2681
2682 #[derive(Serialize)]
2683 struct TestRefSchemaStruct3 {
2684 a: TestInner,
2685 b: Option<TestInner>, }
2687
2688 let schema = Schema::parse_str(
2689 r#"
2690 {
2691 "type":"record",
2692 "name":"TestStruct",
2693 "fields": [
2694 {
2695 "name":"a",
2696 "type":{
2697 "type":"record",
2698 "name": "Inner",
2699 "fields": [ {
2700 "name":"z",
2701 "type":"int"
2702 }]
2703 }
2704 },
2705 {
2706 "name":"b",
2707 "type":"Inner"
2708 }
2709 ]
2710 }"#,
2711 )?;
2712
2713 let test_inner = TestInner { z: 3 };
2714 let test_outer1 = TestRefSchemaStruct1 {
2715 a: test_inner.clone(),
2716 b: "testing".into(),
2717 };
2718 let test_outer2 = TestRefSchemaStruct2 {
2719 a: test_inner.clone(),
2720 b: 24,
2721 };
2722 let test_outer3 = TestRefSchemaStruct3 {
2723 a: test_inner,
2724 b: None,
2725 };
2726
2727 let mut ser = Serializer::default();
2728 let test_outer1: Value = test_outer1.serialize(&mut ser)?;
2729 let mut ser = Serializer::default();
2730 let test_outer2: Value = test_outer2.serialize(&mut ser)?;
2731 let mut ser = Serializer::default();
2732 let test_outer3: Value = test_outer3.serialize(&mut ser)?;
2733
2734 assert!(
2735 !test_outer1.validate(&schema),
2736 "field b record is invalid against the schema"
2737 );
2738 assert!(
2739 !test_outer2.validate(&schema),
2740 "field b record is invalid against the schema"
2741 );
2742 assert!(
2743 !test_outer3.validate(&schema),
2744 "field b record is invalid against the schema"
2745 );
2746
2747 Ok(())
2748 }
2749
2750 fn avro_3674_with_or_without_namespace(with_namespace: bool) -> TestResult {
2751 use crate::ser::Serializer;
2752 use serde::Serialize;
2753
2754 let schema_str = r#"{
2755 "type": "record",
2756 "name": "NamespacedMessage",
2757 [NAMESPACE]
2758 "fields": [
2759 {
2760 "type": "record",
2761 "name": "field_a",
2762 "fields": [
2763 {
2764 "name": "enum_a",
2765 "type": {
2766 "type": "enum",
2767 "name": "EnumType",
2768 "symbols": [
2769 "SYMBOL_1",
2770 "SYMBOL_2"
2771 ],
2772 "default": "SYMBOL_1"
2773 }
2774 },
2775 {
2776 "name": "enum_b",
2777 "type": "EnumType"
2778 }
2779 ]
2780 }
2781 ]
2782 }"#;
2783 let schema_str = schema_str.replace(
2784 "[NAMESPACE]",
2785 if with_namespace {
2786 r#""namespace": "com.domain","#
2787 } else {
2788 ""
2789 },
2790 );
2791
2792 let schema = Schema::parse_str(&schema_str)?;
2793
2794 #[derive(Serialize)]
2795 enum EnumType {
2796 #[serde(rename = "SYMBOL_1")]
2797 Symbol1,
2798 #[serde(rename = "SYMBOL_2")]
2799 Symbol2,
2800 }
2801
2802 #[derive(Serialize)]
2803 struct FieldA {
2804 enum_a: EnumType,
2805 enum_b: EnumType,
2806 }
2807
2808 #[derive(Serialize)]
2809 struct NamespacedMessage {
2810 field_a: FieldA,
2811 }
2812
2813 let msg = NamespacedMessage {
2814 field_a: FieldA {
2815 enum_a: EnumType::Symbol2,
2816 enum_b: EnumType::Symbol1,
2817 },
2818 };
2819
2820 let mut ser = Serializer::default();
2821 let test_value: Value = msg.serialize(&mut ser)?;
2822 assert!(test_value.validate(&schema), "test_value should validate");
2823 assert!(
2824 test_value.resolve(&schema).is_ok(),
2825 "test_value should resolve"
2826 );
2827
2828 Ok(())
2829 }
2830
2831 #[test]
2832 fn test_avro_3674_validate_no_namespace_resolution() -> TestResult {
2833 avro_3674_with_or_without_namespace(false)
2834 }
2835
2836 #[test]
2837 fn test_avro_3674_validate_with_namespace_resolution() -> TestResult {
2838 avro_3674_with_or_without_namespace(true)
2839 }
2840
2841 fn avro_3688_schema_resolution_panic(set_field_b: bool) -> TestResult {
2842 use crate::ser::Serializer;
2843 use serde::{Deserialize, Serialize};
2844
2845 let schema_str = r#"{
2846 "type": "record",
2847 "name": "Message",
2848 "fields": [
2849 {
2850 "name": "field_a",
2851 "type": [
2852 "null",
2853 {
2854 "name": "Inner",
2855 "type": "record",
2856 "fields": [
2857 {
2858 "name": "inner_a",
2859 "type": "string"
2860 }
2861 ]
2862 }
2863 ],
2864 "default": null
2865 },
2866 {
2867 "name": "field_b",
2868 "type": [
2869 "null",
2870 "Inner"
2871 ],
2872 "default": null
2873 }
2874 ]
2875 }"#;
2876
2877 #[derive(Serialize, Deserialize)]
2878 struct Inner {
2879 inner_a: String,
2880 }
2881
2882 #[derive(Serialize, Deserialize)]
2883 struct Message {
2884 field_a: Option<Inner>,
2885 field_b: Option<Inner>,
2886 }
2887
2888 let schema = Schema::parse_str(schema_str)?;
2889
2890 let msg = Message {
2891 field_a: Some(Inner {
2892 inner_a: "foo".to_string(),
2893 }),
2894 field_b: if set_field_b {
2895 Some(Inner {
2896 inner_a: "bar".to_string(),
2897 })
2898 } else {
2899 None
2900 },
2901 };
2902
2903 let mut ser = Serializer::default();
2904 let test_value: Value = msg.serialize(&mut ser)?;
2905 assert!(test_value.validate(&schema), "test_value should validate");
2906 assert!(
2907 test_value.resolve(&schema).is_ok(),
2908 "test_value should resolve"
2909 );
2910
2911 Ok(())
2912 }
2913
2914 #[test]
2915 fn test_avro_3688_field_b_not_set() -> TestResult {
2916 avro_3688_schema_resolution_panic(false)
2917 }
2918
2919 #[test]
2920 fn test_avro_3688_field_b_set() -> TestResult {
2921 avro_3688_schema_resolution_panic(true)
2922 }
2923
2924 #[test]
2925 fn test_avro_3764_use_resolve_schemata() -> TestResult {
2926 let referenced_schema =
2927 r#"{"name": "enumForReference", "type": "enum", "symbols": ["A", "B"]}"#;
2928 let main_schema = r#"{"name": "recordWithReference", "type": "record", "fields": [{"name": "reference", "type": "enumForReference"}]}"#;
2929
2930 let value: serde_json::Value = serde_json::from_str(
2931 r#"
2932 {
2933 "reference": "A"
2934 }
2935 "#,
2936 )?;
2937
2938 let avro_value = Value::from(value);
2939
2940 let schemas = Schema::parse_list(&[main_schema, referenced_schema])?;
2941
2942 let main_schema = schemas.first().unwrap();
2943 let schemata: Vec<_> = schemas.iter().skip(1).collect();
2944
2945 let resolve_result = avro_value.clone().resolve_schemata(main_schema, schemata);
2946
2947 assert!(
2948 resolve_result.is_ok(),
2949 "result of resolving with schemata should be ok, got: {:?}",
2950 resolve_result
2951 );
2952
2953 let resolve_result = avro_value.resolve(main_schema);
2954 assert!(
2955 resolve_result.is_err(),
2956 "result of resolving without schemata should be err, got: {:?}",
2957 resolve_result
2958 );
2959
2960 Ok(())
2961 }
2962
2963 #[test]
2964 fn test_avro_3767_union_resolve_complex_refs() -> TestResult {
2965 let referenced_enum =
2966 r#"{"name": "enumForReference", "type": "enum", "symbols": ["A", "B"]}"#;
2967 let referenced_record = r#"{"name": "recordForReference", "type": "record", "fields": [{"name": "refInRecord", "type": "enumForReference"}]}"#;
2968 let main_schema = r#"{"name": "recordWithReference", "type": "record", "fields": [{"name": "reference", "type": ["null", "recordForReference"]}]}"#;
2969
2970 let value: serde_json::Value = serde_json::from_str(
2971 r#"
2972 {
2973 "reference": {
2974 "refInRecord": "A"
2975 }
2976 }
2977 "#,
2978 )?;
2979
2980 let avro_value = Value::from(value);
2981
2982 let schemata = Schema::parse_list(&[referenced_enum, referenced_record, main_schema])?;
2983
2984 let main_schema = schemata.last().unwrap();
2985 let other_schemata: Vec<&Schema> = schemata.iter().take(2).collect();
2986
2987 let resolve_result = avro_value.resolve_schemata(main_schema, other_schemata);
2988
2989 assert!(
2990 resolve_result.is_ok(),
2991 "result of resolving with schemata should be ok, got: {:?}",
2992 resolve_result
2993 );
2994
2995 assert!(
2996 resolve_result?.validate_schemata(schemata.iter().collect()),
2997 "result of validation with schemata should be true"
2998 );
2999
3000 Ok(())
3001 }
3002
3003 #[test]
3004 fn test_avro_3782_incorrect_decimal_resolving() -> TestResult {
3005 let schema = r#"{"name": "decimalSchema", "logicalType": "decimal", "type": "fixed", "precision": 8, "scale": 0, "size": 8}"#;
3006
3007 let avro_value = Value::Decimal(Decimal::from(
3008 BigInt::from(12345678u32).to_signed_bytes_be(),
3009 ));
3010 let schema = Schema::parse_str(schema)?;
3011 let resolve_result = avro_value.resolve(&schema);
3012 assert!(
3013 resolve_result.is_ok(),
3014 "resolve result must be ok, got: {resolve_result:?}"
3015 );
3016
3017 Ok(())
3018 }
3019
3020 #[test]
3021 fn test_avro_3779_bigdecimal_resolving() -> TestResult {
3022 let schema =
3023 r#"{"name": "bigDecimalSchema", "logicalType": "big-decimal", "type": "bytes" }"#;
3024
3025 let avro_value = Value::BigDecimal(BigDecimal::from(12345678u32));
3026 let schema = Schema::parse_str(schema)?;
3027 let resolve_result: AvroResult<Value> = avro_value.resolve(&schema);
3028 assert!(
3029 resolve_result.is_ok(),
3030 "resolve result must be ok, got: {resolve_result:?}"
3031 );
3032
3033 Ok(())
3034 }
3035
3036 #[test]
3037 fn test_avro_3892_resolve_fixed_from_bytes() -> TestResult {
3038 let value = Value::Bytes(vec![97, 98, 99]);
3039 assert_eq!(
3040 value.resolve(&Schema::Fixed(FixedSchema {
3041 name: "test".into(),
3042 aliases: None,
3043 doc: None,
3044 size: 3,
3045 default: None,
3046 attributes: Default::default()
3047 }))?,
3048 Value::Fixed(3, vec![97, 98, 99])
3049 );
3050
3051 let value = Value::Bytes(vec![97, 99]);
3052 assert!(value
3053 .resolve(&Schema::Fixed(FixedSchema {
3054 name: "test".into(),
3055 aliases: None,
3056 doc: None,
3057 size: 3,
3058 default: None,
3059 attributes: Default::default()
3060 }))
3061 .is_err(),);
3062
3063 let value = Value::Bytes(vec![97, 98, 99, 100]);
3064 assert!(value
3065 .resolve(&Schema::Fixed(FixedSchema {
3066 name: "test".into(),
3067 aliases: None,
3068 doc: None,
3069 size: 3,
3070 default: None,
3071 attributes: Default::default()
3072 }))
3073 .is_err(),);
3074
3075 Ok(())
3076 }
3077
3078 #[test]
3079 fn avro_3928_from_serde_value_to_types_value() {
3080 assert_eq!(Value::from(serde_json::Value::Null), Value::Null);
3081 assert_eq!(Value::from(json!(true)), Value::Boolean(true));
3082 assert_eq!(Value::from(json!(false)), Value::Boolean(false));
3083 assert_eq!(Value::from(json!(0)), Value::Int(0));
3084 assert_eq!(Value::from(json!(i32::MIN)), Value::Int(i32::MIN));
3085 assert_eq!(Value::from(json!(i32::MAX)), Value::Int(i32::MAX));
3086 assert_eq!(
3087 Value::from(json!(i32::MIN as i64 - 1)),
3088 Value::Long(i32::MIN as i64 - 1)
3089 );
3090 assert_eq!(
3091 Value::from(json!(i32::MAX as i64 + 1)),
3092 Value::Long(i32::MAX as i64 + 1)
3093 );
3094 assert_eq!(Value::from(json!(1.23)), Value::Double(1.23));
3095 assert_eq!(Value::from(json!(-1.23)), Value::Double(-1.23));
3096 assert_eq!(Value::from(json!(u64::MIN)), Value::Int(u64::MIN as i32));
3097 assert_eq!(Value::from(json!(u64::MAX)), Value::Long(u64::MAX as i64));
3098 assert_eq!(
3099 Value::from(json!("some text")),
3100 Value::String("some text".into())
3101 );
3102 assert_eq!(
3103 Value::from(json!(["text1", "text2", "text3"])),
3104 Value::Array(vec![
3105 Value::String("text1".into()),
3106 Value::String("text2".into()),
3107 Value::String("text3".into())
3108 ])
3109 );
3110 assert_eq!(
3111 Value::from(json!({"key1": "value1", "key2": "value2"})),
3112 Value::Map(
3113 vec![
3114 ("key1".into(), Value::String("value1".into())),
3115 ("key2".into(), Value::String("value2".into()))
3116 ]
3117 .into_iter()
3118 .collect()
3119 )
3120 );
3121 }
3122}