1use crate::{
20 bigdecimal::{deserialize_big_decimal, serialize_big_decimal},
21 decimal::Decimal,
22 duration::Duration,
23 schema::{
24 DecimalSchema, EnumSchema, FixedSchema, Name, Namespace, Precision, RecordField,
25 RecordSchema, ResolvedSchema, Scale, Schema, SchemaKind, UnionSchema,
26 },
27 AvroResult, Error,
28};
29use bigdecimal::BigDecimal;
30use log::{debug, error};
31use serde_json::{Number, Value as JsonValue};
32use std::{
33 borrow::Borrow,
34 collections::{BTreeMap, HashMap},
35 fmt::Debug,
36 hash::BuildHasher,
37 str::FromStr,
38};
39use uuid::Uuid;
40
41fn max_prec_for_len(len: usize) -> Result<usize, Error> {
43 let len = i32::try_from(len).map_err(|e| Error::ConvertLengthToI32(e, len))?;
44 Ok((2.0_f64.powi(8 * len - 1) - 1.0).log10().floor() as usize)
45}
46
47#[derive(Clone, Debug, PartialEq, strum_macros::EnumDiscriminants)]
52#[strum_discriminants(name(ValueKind))]
53pub enum Value {
54 Null,
56 Boolean(bool),
58 Int(i32),
60 Long(i64),
62 Float(f32),
64 Double(f64),
66 Bytes(Vec<u8>),
68 String(String),
70 Fixed(usize, Vec<u8>),
73 Enum(u32, String),
80 Union(u32, Box<Value>),
87 Array(Vec<Value>),
89 Map(HashMap<String, Value>),
91 Record(Vec<(String, Value)>),
98 Date(i32),
103 Decimal(Decimal),
105 BigDecimal(BigDecimal),
107 TimeMillis(i32),
109 TimeMicros(i64),
111 TimestampMillis(i64),
113 TimestampMicros(i64),
115 TimestampNanos(i64),
117 LocalTimestampMillis(i64),
119 LocalTimestampMicros(i64),
121 LocalTimestampNanos(i64),
123 Duration(Duration),
125 Uuid(Uuid),
127}
128
129#[deprecated(
132 since = "0.11.0",
133 note = "Please use Value::from, Into::into or value.into() instead"
134)]
135pub trait ToAvro {
136 fn avro(self) -> Value;
138}
139
140#[allow(deprecated)]
141impl<T: Into<Value>> ToAvro for T {
142 fn avro(self) -> Value {
143 self.into()
144 }
145}
146
147macro_rules! to_value(
148 ($type:ty, $variant_constructor:expr) => (
149 impl From<$type> for Value {
150 fn from(value: $type) -> Self {
151 $variant_constructor(value)
152 }
153 }
154 );
155);
156
157to_value!(bool, Value::Boolean);
158to_value!(i32, Value::Int);
159to_value!(i64, Value::Long);
160to_value!(f32, Value::Float);
161to_value!(f64, Value::Double);
162to_value!(String, Value::String);
163to_value!(Vec<u8>, Value::Bytes);
164to_value!(uuid::Uuid, Value::Uuid);
165to_value!(Decimal, Value::Decimal);
166to_value!(BigDecimal, Value::BigDecimal);
167to_value!(Duration, Value::Duration);
168
169impl From<()> for Value {
170 fn from(_: ()) -> Self {
171 Self::Null
172 }
173}
174
175impl From<usize> for Value {
176 fn from(value: usize) -> Self {
177 i64::try_from(value)
178 .expect("cannot convert usize to i64")
179 .into()
180 }
181}
182
183impl From<&str> for Value {
184 fn from(value: &str) -> Self {
185 Self::String(value.to_owned())
186 }
187}
188
189impl From<&[u8]> for Value {
190 fn from(value: &[u8]) -> Self {
191 Self::Bytes(value.to_owned())
192 }
193}
194
195impl<T> From<Option<T>> for Value
196where
197 T: Into<Self>,
198{
199 fn from(value: Option<T>) -> Self {
200 Self::Union(
202 value.is_some() as u32,
203 Box::new(value.map_or_else(|| Self::Null, Into::into)),
204 )
205 }
206}
207
208impl<K, V, S> From<HashMap<K, V, S>> for Value
209where
210 K: Into<String>,
211 V: Into<Self>,
212 S: BuildHasher,
213{
214 fn from(value: HashMap<K, V, S>) -> Self {
215 Self::Map(
216 value
217 .into_iter()
218 .map(|(key, value)| (key.into(), value.into()))
219 .collect(),
220 )
221 }
222}
223
224#[derive(Debug, Clone)]
226pub struct Record<'a> {
227 pub fields: Vec<(String, Value)>,
231 schema_lookup: &'a BTreeMap<String, usize>,
232}
233
234impl Record<'_> {
235 pub fn new(schema: &Schema) -> Option<Record> {
239 match *schema {
240 Schema::Record(RecordSchema {
241 fields: ref schema_fields,
242 lookup: ref schema_lookup,
243 ..
244 }) => {
245 let mut fields = Vec::with_capacity(schema_fields.len());
246 for schema_field in schema_fields.iter() {
247 fields.push((schema_field.name.clone(), Value::Null));
248 }
249
250 Some(Record {
251 fields,
252 schema_lookup,
253 })
254 }
255 _ => None,
256 }
257 }
258
259 pub fn put<V>(&mut self, field: &str, value: V)
265 where
266 V: Into<Value>,
267 {
268 if let Some(&position) = self.schema_lookup.get(field) {
269 self.fields[position].1 = value.into()
270 }
271 }
272
273 pub fn get(&self, field: &str) -> Option<&Value> {
276 self.schema_lookup
277 .get(field)
278 .map(|&position| &self.fields[position].1)
279 }
280}
281
282impl<'a> From<Record<'a>> for Value {
283 fn from(value: Record<'a>) -> Self {
284 Self::Record(value.fields)
285 }
286}
287
288impl From<JsonValue> for Value {
289 fn from(value: JsonValue) -> Self {
290 match value {
291 JsonValue::Null => Self::Null,
292 JsonValue::Bool(b) => b.into(),
293 JsonValue::Number(ref n) if n.is_i64() => {
294 let n = n.as_i64().unwrap();
295 if n >= i32::MIN as i64 && n <= i32::MAX as i64 {
296 Value::Int(n as i32)
297 } else {
298 Value::Long(n)
299 }
300 }
301 JsonValue::Number(ref n) if n.is_f64() => Value::Double(n.as_f64().unwrap()),
302 JsonValue::Number(n) => Value::Long(n.as_u64().unwrap() as i64), JsonValue::String(s) => s.into(),
304 JsonValue::Array(items) => Value::Array(items.into_iter().map(Value::from).collect()),
305 JsonValue::Object(items) => Value::Map(
306 items
307 .into_iter()
308 .map(|(key, value)| (key, value.into()))
309 .collect(),
310 ),
311 }
312 }
313}
314
315impl TryFrom<Value> for JsonValue {
317 type Error = crate::error::Error;
318 fn try_from(value: Value) -> AvroResult<Self> {
319 match value {
320 Value::Null => Ok(Self::Null),
321 Value::Boolean(b) => Ok(Self::Bool(b)),
322 Value::Int(i) => Ok(Self::Number(i.into())),
323 Value::Long(l) => Ok(Self::Number(l.into())),
324 Value::Float(f) => Number::from_f64(f.into())
325 .map(Self::Number)
326 .ok_or_else(|| Error::ConvertF64ToJson(f.into())),
327 Value::Double(d) => Number::from_f64(d)
328 .map(Self::Number)
329 .ok_or(Error::ConvertF64ToJson(d)),
330 Value::Bytes(bytes) => Ok(Self::Array(bytes.into_iter().map(|b| b.into()).collect())),
331 Value::String(s) => Ok(Self::String(s)),
332 Value::Fixed(_size, items) => {
333 Ok(Self::Array(items.into_iter().map(|v| v.into()).collect()))
334 }
335 Value::Enum(_i, s) => Ok(Self::String(s)),
336 Value::Union(_i, b) => Self::try_from(*b),
337 Value::Array(items) => items
338 .into_iter()
339 .map(Self::try_from)
340 .collect::<Result<Vec<_>, _>>()
341 .map(Self::Array),
342 Value::Map(items) => items
343 .into_iter()
344 .map(|(key, value)| Self::try_from(value).map(|v| (key, v)))
345 .collect::<Result<Vec<_>, _>>()
346 .map(|v| Self::Object(v.into_iter().collect())),
347 Value::Record(items) => items
348 .into_iter()
349 .map(|(key, value)| Self::try_from(value).map(|v| (key, v)))
350 .collect::<Result<Vec<_>, _>>()
351 .map(|v| Self::Object(v.into_iter().collect())),
352 Value::Date(d) => Ok(Self::Number(d.into())),
353 Value::Decimal(ref d) => <Vec<u8>>::try_from(d)
354 .map(|vec| Self::Array(vec.into_iter().map(|v| v.into()).collect())),
355 Value::BigDecimal(ref bg) => {
356 let vec1: Vec<u8> = serialize_big_decimal(bg)?;
357 Ok(Self::Array(vec1.into_iter().map(|b| b.into()).collect()))
358 }
359 Value::TimeMillis(t) => Ok(Self::Number(t.into())),
360 Value::TimeMicros(t) => Ok(Self::Number(t.into())),
361 Value::TimestampMillis(t) => Ok(Self::Number(t.into())),
362 Value::TimestampMicros(t) => Ok(Self::Number(t.into())),
363 Value::TimestampNanos(t) => Ok(Self::Number(t.into())),
364 Value::LocalTimestampMillis(t) => Ok(Self::Number(t.into())),
365 Value::LocalTimestampMicros(t) => Ok(Self::Number(t.into())),
366 Value::LocalTimestampNanos(t) => Ok(Self::Number(t.into())),
367 Value::Duration(d) => Ok(Self::Array(
368 <[u8; 12]>::from(d).iter().map(|&v| v.into()).collect(),
369 )),
370 Value::Uuid(uuid) => Ok(Self::String(uuid.as_hyphenated().to_string())),
371 }
372 }
373}
374
375impl Value {
376 pub fn validate(&self, schema: &Schema) -> bool {
381 self.validate_schemata(vec![schema])
382 }
383
384 pub fn validate_schemata(&self, schemata: Vec<&Schema>) -> bool {
385 let rs = ResolvedSchema::try_from(schemata.clone())
386 .expect("Schemata didn't successfully resolve");
387 let schemata_len = schemata.len();
388 schemata.iter().any(|schema| {
389 let enclosing_namespace = schema.namespace();
390
391 match self.validate_internal(schema, rs.get_names(), &enclosing_namespace) {
392 Some(reason) => {
393 let log_message =
394 format!("Invalid value: {self:?} for schema: {schema:?}. Reason: {reason}");
395 if schemata_len == 1 {
396 error!("{log_message}");
397 } else {
398 debug!("{log_message}");
399 };
400 false
401 }
402 None => true,
403 }
404 })
405 }
406
407 fn accumulate(accumulator: Option<String>, other: Option<String>) -> Option<String> {
408 match (accumulator, other) {
409 (None, None) => None,
410 (None, s @ Some(_)) => s,
411 (s @ Some(_), None) => s,
412 (Some(reason1), Some(reason2)) => Some(format!("{reason1}\n{reason2}")),
413 }
414 }
415
416 pub(crate) fn validate_internal<S: std::borrow::Borrow<Schema> + Debug>(
418 &self,
419 schema: &Schema,
420 names: &HashMap<Name, S>,
421 enclosing_namespace: &Namespace,
422 ) -> Option<String> {
423 match (self, schema) {
424 (_, Schema::Ref { name }) => {
425 let name = name.fully_qualified_name(enclosing_namespace);
426 names.get(&name).map_or_else(
427 || {
428 Some(format!(
429 "Unresolved schema reference: '{:?}'. Parsed names: {:?}",
430 name,
431 names.keys()
432 ))
433 },
434 |s| self.validate_internal(s.borrow(), names, &name.namespace),
435 )
436 }
437 (&Value::Null, &Schema::Null) => None,
438 (&Value::Boolean(_), &Schema::Boolean) => None,
439 (&Value::Int(_), &Schema::Int) => None,
440 (&Value::Int(_), &Schema::Date) => None,
441 (&Value::Int(_), &Schema::TimeMillis) => None,
442 (&Value::Int(_), &Schema::Long) => None,
443 (&Value::Long(_), &Schema::Long) => None,
444 (&Value::Long(_), &Schema::TimeMicros) => None,
445 (&Value::Long(_), &Schema::TimestampMillis) => None,
446 (&Value::Long(_), &Schema::TimestampMicros) => None,
447 (&Value::Long(_), &Schema::LocalTimestampMillis) => None,
448 (&Value::Long(_), &Schema::LocalTimestampMicros) => None,
449 (&Value::TimestampMicros(_), &Schema::TimestampMicros) => None,
450 (&Value::TimestampMillis(_), &Schema::TimestampMillis) => None,
451 (&Value::TimestampNanos(_), &Schema::TimestampNanos) => None,
452 (&Value::LocalTimestampMicros(_), &Schema::LocalTimestampMicros) => None,
453 (&Value::LocalTimestampMillis(_), &Schema::LocalTimestampMillis) => None,
454 (&Value::LocalTimestampNanos(_), &Schema::LocalTimestampNanos) => None,
455 (&Value::TimeMicros(_), &Schema::TimeMicros) => None,
456 (&Value::TimeMillis(_), &Schema::TimeMillis) => None,
457 (&Value::Date(_), &Schema::Date) => None,
458 (&Value::Decimal(_), &Schema::Decimal { .. }) => None,
459 (&Value::BigDecimal(_), &Schema::BigDecimal) => None,
460 (&Value::Duration(_), &Schema::Duration) => None,
461 (&Value::Uuid(_), &Schema::Uuid) => None,
462 (&Value::Float(_), &Schema::Float) => None,
463 (&Value::Float(_), &Schema::Double) => None,
464 (&Value::Double(_), &Schema::Double) => None,
465 (&Value::Bytes(_), &Schema::Bytes) => None,
466 (&Value::Bytes(_), &Schema::Decimal { .. }) => None,
467 (&Value::String(_), &Schema::String) => None,
468 (&Value::String(_), &Schema::Uuid) => None,
469 (&Value::Fixed(n, _), &Schema::Fixed(FixedSchema { size, .. })) => {
470 if n != size {
471 Some(format!(
472 "The value's size ({n}) is different than the schema's size ({size})"
473 ))
474 } else {
475 None
476 }
477 }
478 (Value::Bytes(b), &Schema::Fixed(FixedSchema { size, .. })) => {
479 if b.len() != size {
480 Some(format!(
481 "The bytes' length ({}) is different than the schema's size ({})",
482 b.len(),
483 size
484 ))
485 } else {
486 None
487 }
488 }
489 (&Value::Fixed(n, _), &Schema::Duration) => {
490 if n != 12 {
491 Some(format!(
492 "The value's size ('{n}') must be exactly 12 to be a Duration"
493 ))
494 } else {
495 None
496 }
497 }
498 (&Value::Fixed(_n, _), &Schema::Decimal { .. }) => None,
500 (Value::String(s), Schema::Enum(EnumSchema { symbols, .. })) => {
501 if !symbols.contains(s) {
502 Some(format!("'{s}' is not a member of the possible symbols"))
503 } else {
504 None
505 }
506 }
507 (
508 &Value::Enum(i, ref s),
509 Schema::Enum(EnumSchema {
510 symbols, default, ..
511 }),
512 ) => symbols
513 .get(i as usize)
514 .map(|ref symbol| {
515 if symbol != &s {
516 Some(format!("Symbol '{s}' is not at position '{i}'"))
517 } else {
518 None
519 }
520 })
521 .unwrap_or_else(|| match default {
522 Some(_) => None,
523 None => Some(format!("No symbol at position '{i}'")),
524 }),
525 (&Value::Union(i, ref value), Schema::Union(inner)) => inner
527 .variants()
528 .get(i as usize)
529 .map(|schema| value.validate_internal(schema, names, enclosing_namespace))
530 .unwrap_or_else(|| Some(format!("No schema in the union at position '{i}'"))),
531 (v, Schema::Union(inner)) => {
532 match inner.find_schema_with_known_schemata(v, Some(names), enclosing_namespace) {
533 Some(_) => None,
534 None => Some("Could not find matching type in union".to_string()),
535 }
536 }
537 (Value::Array(items), Schema::Array(inner)) => items.iter().fold(None, |acc, item| {
538 Value::accumulate(
539 acc,
540 item.validate_internal(&inner.items, names, enclosing_namespace),
541 )
542 }),
543 (Value::Map(items), Schema::Map(inner)) => {
544 items.iter().fold(None, |acc, (_, value)| {
545 Value::accumulate(
546 acc,
547 value.validate_internal(&inner.types, names, enclosing_namespace),
548 )
549 })
550 }
551 (
552 Value::Record(record_fields),
553 Schema::Record(RecordSchema {
554 fields,
555 lookup,
556 name,
557 ..
558 }),
559 ) => {
560 let non_nullable_fields_count =
561 fields.iter().filter(|&rf| !rf.is_nullable()).count();
562
563 if record_fields.len() < non_nullable_fields_count {
565 return Some(format!(
566 "The value's records length ({}) doesn't match the schema ({} non-nullable fields)",
567 record_fields.len(),
568 non_nullable_fields_count
569 ));
570 } else if record_fields.len() > fields.len() {
571 return Some(format!(
572 "The value's records length ({}) is greater than the schema's ({} fields)",
573 record_fields.len(),
574 fields.len(),
575 ));
576 }
577
578 record_fields
579 .iter()
580 .fold(None, |acc, (field_name, record_field)| {
581 let record_namespace = if name.namespace.is_none() {
582 enclosing_namespace
583 } else {
584 &name.namespace
585 };
586 match lookup.get(field_name) {
587 Some(idx) => {
588 let field = &fields[*idx];
589 Value::accumulate(
590 acc,
591 record_field.validate_internal(
592 &field.schema,
593 names,
594 record_namespace,
595 ),
596 )
597 }
598 None => Value::accumulate(
599 acc,
600 Some(format!("There is no schema field for field '{field_name}'")),
601 ),
602 }
603 })
604 }
605 (Value::Map(items), Schema::Record(RecordSchema { fields, .. })) => {
606 fields.iter().fold(None, |acc, field| {
607 if let Some(item) = items.get(&field.name) {
608 let res = item.validate_internal(&field.schema, names, enclosing_namespace);
609 Value::accumulate(acc, res)
610 } else if !field.is_nullable() {
611 Value::accumulate(
612 acc,
613 Some(format!(
614 "Field with name '{:?}' is not a member of the map items",
615 field.name
616 )),
617 )
618 } else {
619 acc
620 }
621 })
622 }
623 (v, s) => Some(format!(
624 "Unsupported value-schema combination! Value: {v:?}, schema: {s:?}"
625 )),
626 }
627 }
628
629 pub fn resolve(self, schema: &Schema) -> AvroResult<Self> {
636 self.resolve_schemata(schema, Vec::with_capacity(0))
637 }
638
639 pub fn resolve_schemata(self, schema: &Schema, schemata: Vec<&Schema>) -> AvroResult<Self> {
646 let enclosing_namespace = schema.namespace();
647 let rs = if schemata.is_empty() {
648 ResolvedSchema::try_from(schema)?
649 } else {
650 ResolvedSchema::try_from(schemata)?
651 };
652 self.resolve_internal(schema, rs.get_names(), &enclosing_namespace, &None)
653 }
654
655 pub(crate) fn resolve_internal<S: Borrow<Schema> + Debug>(
656 mut self,
657 schema: &Schema,
658 names: &HashMap<Name, S>,
659 enclosing_namespace: &Namespace,
660 field_default: &Option<JsonValue>,
661 ) -> AvroResult<Self> {
662 if SchemaKind::from(&self) == SchemaKind::Union
664 && SchemaKind::from(schema) != SchemaKind::Union
665 {
666 let v = match self {
668 Value::Union(_i, b) => *b,
669 _ => unreachable!(),
670 };
671 self = v;
672 }
673 match *schema {
674 Schema::Ref { ref name } => {
675 let name = name.fully_qualified_name(enclosing_namespace);
676
677 if let Some(resolved) = names.get(&name) {
678 debug!("Resolved {name:?}");
679 self.resolve_internal(resolved.borrow(), names, &name.namespace, field_default)
680 } else {
681 error!("Failed to resolve schema {name:?}");
682 Err(Error::SchemaResolutionError(name.clone()))
683 }
684 }
685 Schema::Null => self.resolve_null(),
686 Schema::Boolean => self.resolve_boolean(),
687 Schema::Int => self.resolve_int(),
688 Schema::Long => self.resolve_long(),
689 Schema::Float => self.resolve_float(),
690 Schema::Double => self.resolve_double(),
691 Schema::Bytes => self.resolve_bytes(),
692 Schema::String => self.resolve_string(),
693 Schema::Fixed(FixedSchema { size, .. }) => self.resolve_fixed(size),
694 Schema::Union(ref inner) => {
695 self.resolve_union(inner, names, enclosing_namespace, field_default)
696 }
697 Schema::Enum(EnumSchema {
698 ref symbols,
699 ref default,
700 ..
701 }) => self.resolve_enum(symbols, default, field_default),
702 Schema::Array(ref inner) => {
703 self.resolve_array(&inner.items, names, enclosing_namespace)
704 }
705 Schema::Map(ref inner) => self.resolve_map(&inner.types, names, enclosing_namespace),
706 Schema::Record(RecordSchema { ref fields, .. }) => {
707 self.resolve_record(fields, names, enclosing_namespace)
708 }
709 Schema::Decimal(DecimalSchema {
710 scale,
711 precision,
712 ref inner,
713 }) => self.resolve_decimal(precision, scale, inner),
714 Schema::BigDecimal => self.resolve_bigdecimal(),
715 Schema::Date => self.resolve_date(),
716 Schema::TimeMillis => self.resolve_time_millis(),
717 Schema::TimeMicros => self.resolve_time_micros(),
718 Schema::TimestampMillis => self.resolve_timestamp_millis(),
719 Schema::TimestampMicros => self.resolve_timestamp_micros(),
720 Schema::TimestampNanos => self.resolve_timestamp_nanos(),
721 Schema::LocalTimestampMillis => self.resolve_local_timestamp_millis(),
722 Schema::LocalTimestampMicros => self.resolve_local_timestamp_micros(),
723 Schema::LocalTimestampNanos => self.resolve_local_timestamp_nanos(),
724 Schema::Duration => self.resolve_duration(),
725 Schema::Uuid => self.resolve_uuid(),
726 }
727 }
728
729 fn resolve_uuid(self) -> Result<Self, Error> {
730 Ok(match self {
731 uuid @ Value::Uuid(_) => uuid,
732 Value::String(ref string) => {
733 Value::Uuid(Uuid::from_str(string).map_err(Error::ConvertStrToUuid)?)
734 }
735 other => return Err(Error::GetUuid(other)),
736 })
737 }
738
739 fn resolve_bigdecimal(self) -> Result<Self, Error> {
740 Ok(match self {
741 bg @ Value::BigDecimal(_) => bg,
742 Value::Bytes(b) => Value::BigDecimal(deserialize_big_decimal(&b).unwrap()),
743 other => return Err(Error::GetBigDecimal(other)),
744 })
745 }
746
747 fn resolve_duration(self) -> Result<Self, Error> {
748 Ok(match self {
749 duration @ Value::Duration { .. } => duration,
750 Value::Fixed(size, bytes) => {
751 if size != 12 {
752 return Err(Error::GetDecimalFixedBytes(size));
753 }
754 Value::Duration(Duration::from([
755 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
756 bytes[8], bytes[9], bytes[10], bytes[11],
757 ]))
758 }
759 other => return Err(Error::ResolveDuration(other)),
760 })
761 }
762
763 fn resolve_decimal(
764 self,
765 precision: Precision,
766 scale: Scale,
767 inner: &Schema,
768 ) -> Result<Self, Error> {
769 if scale > precision {
770 return Err(Error::GetScaleAndPrecision { scale, precision });
771 }
772 match inner {
773 &Schema::Fixed(FixedSchema { size, .. }) => {
774 if max_prec_for_len(size)? < precision {
775 return Err(Error::GetScaleWithFixedSize { size, precision });
776 }
777 }
778 Schema::Bytes => (),
779 _ => return Err(Error::ResolveDecimalSchema(inner.into())),
780 };
781 match self {
782 Value::Decimal(num) => {
783 let num_bytes = num.len();
784 if max_prec_for_len(num_bytes)? < precision {
785 Err(Error::ComparePrecisionAndSize {
786 precision,
787 num_bytes,
788 })
789 } else {
790 Ok(Value::Decimal(num))
791 }
792 }
794 Value::Fixed(_, bytes) | Value::Bytes(bytes) => {
795 if max_prec_for_len(bytes.len())? < precision {
796 Err(Error::ComparePrecisionAndSize {
797 precision,
798 num_bytes: bytes.len(),
799 })
800 } else {
801 Ok(Value::Decimal(Decimal::from(bytes)))
803 }
804 }
805 other => Err(Error::ResolveDecimal(other)),
806 }
807 }
808
809 fn resolve_date(self) -> Result<Self, Error> {
810 match self {
811 Value::Date(d) | Value::Int(d) => Ok(Value::Date(d)),
812 other => Err(Error::GetDate(other)),
813 }
814 }
815
816 fn resolve_time_millis(self) -> Result<Self, Error> {
817 match self {
818 Value::TimeMillis(t) | Value::Int(t) => Ok(Value::TimeMillis(t)),
819 other => Err(Error::GetTimeMillis(other)),
820 }
821 }
822
823 fn resolve_time_micros(self) -> Result<Self, Error> {
824 match self {
825 Value::TimeMicros(t) | Value::Long(t) => Ok(Value::TimeMicros(t)),
826 Value::Int(t) => Ok(Value::TimeMicros(i64::from(t))),
827 other => Err(Error::GetTimeMicros(other)),
828 }
829 }
830
831 fn resolve_timestamp_millis(self) -> Result<Self, Error> {
832 match self {
833 Value::TimestampMillis(ts) | Value::Long(ts) => Ok(Value::TimestampMillis(ts)),
834 Value::Int(ts) => Ok(Value::TimestampMillis(i64::from(ts))),
835 other => Err(Error::GetTimestampMillis(other)),
836 }
837 }
838
839 fn resolve_timestamp_micros(self) -> Result<Self, Error> {
840 match self {
841 Value::TimestampMicros(ts) | Value::Long(ts) => Ok(Value::TimestampMicros(ts)),
842 Value::Int(ts) => Ok(Value::TimestampMicros(i64::from(ts))),
843 other => Err(Error::GetTimestampMicros(other)),
844 }
845 }
846
847 fn resolve_timestamp_nanos(self) -> Result<Self, Error> {
848 match self {
849 Value::TimestampNanos(ts) | Value::Long(ts) => Ok(Value::TimestampNanos(ts)),
850 Value::Int(ts) => Ok(Value::TimestampNanos(i64::from(ts))),
851 other => Err(Error::GetTimestampNanos(other)),
852 }
853 }
854
855 fn resolve_local_timestamp_millis(self) -> Result<Self, Error> {
856 match self {
857 Value::LocalTimestampMillis(ts) | Value::Long(ts) => {
858 Ok(Value::LocalTimestampMillis(ts))
859 }
860 Value::Int(ts) => Ok(Value::LocalTimestampMillis(i64::from(ts))),
861 other => Err(Error::GetLocalTimestampMillis(other)),
862 }
863 }
864
865 fn resolve_local_timestamp_micros(self) -> Result<Self, Error> {
866 match self {
867 Value::LocalTimestampMicros(ts) | Value::Long(ts) => {
868 Ok(Value::LocalTimestampMicros(ts))
869 }
870 Value::Int(ts) => Ok(Value::LocalTimestampMicros(i64::from(ts))),
871 other => Err(Error::GetLocalTimestampMicros(other)),
872 }
873 }
874
875 fn resolve_local_timestamp_nanos(self) -> Result<Self, Error> {
876 match self {
877 Value::LocalTimestampNanos(ts) | Value::Long(ts) => Ok(Value::LocalTimestampNanos(ts)),
878 Value::Int(ts) => Ok(Value::LocalTimestampNanos(i64::from(ts))),
879 other => Err(Error::GetLocalTimestampNanos(other)),
880 }
881 }
882
883 fn resolve_null(self) -> Result<Self, Error> {
884 match self {
885 Value::Null => Ok(Value::Null),
886 other => Err(Error::GetNull(other)),
887 }
888 }
889
890 fn resolve_boolean(self) -> Result<Self, Error> {
891 match self {
892 Value::Boolean(b) => Ok(Value::Boolean(b)),
893 other => Err(Error::GetBoolean(other)),
894 }
895 }
896
897 fn resolve_int(self) -> Result<Self, Error> {
898 match self {
899 Value::Int(n) => Ok(Value::Int(n)),
900 Value::Long(n) => Ok(Value::Int(n as i32)),
901 other => Err(Error::GetInt(other)),
902 }
903 }
904
905 fn resolve_long(self) -> Result<Self, Error> {
906 match self {
907 Value::Int(n) => Ok(Value::Long(i64::from(n))),
908 Value::Long(n) => Ok(Value::Long(n)),
909 other => Err(Error::GetLong(other)),
910 }
911 }
912
913 fn resolve_float(self) -> Result<Self, Error> {
914 match self {
915 Value::Int(n) => Ok(Value::Float(n as f32)),
916 Value::Long(n) => Ok(Value::Float(n as f32)),
917 Value::Float(x) => Ok(Value::Float(x)),
918 Value::Double(x) => Ok(Value::Float(x as f32)),
919 Value::String(ref x) => match Self::parse_special_float(x) {
920 Some(f) => Ok(Value::Float(f)),
921 None => Err(Error::GetFloat(self)),
922 },
923 other => Err(Error::GetFloat(other)),
924 }
925 }
926
927 fn resolve_double(self) -> Result<Self, Error> {
928 match self {
929 Value::Int(n) => Ok(Value::Double(f64::from(n))),
930 Value::Long(n) => Ok(Value::Double(n as f64)),
931 Value::Float(x) => Ok(Value::Double(f64::from(x))),
932 Value::Double(x) => Ok(Value::Double(x)),
933 Value::String(ref x) => match Self::parse_special_float(x) {
934 Some(f) => Ok(Value::Double(f64::from(f))),
935 None => Err(Error::GetDouble(self)),
936 },
937 other => Err(Error::GetDouble(other)),
938 }
939 }
940
941 fn parse_special_float(value: &str) -> Option<f32> {
944 match value {
945 "NaN" => Some(f32::NAN),
946 "INF" | "Infinity" => Some(f32::INFINITY),
947 "-INF" | "-Infinity" => Some(f32::NEG_INFINITY),
948 _ => None,
949 }
950 }
951
952 fn resolve_bytes(self) -> Result<Self, Error> {
953 match self {
954 Value::Bytes(bytes) => Ok(Value::Bytes(bytes)),
955 Value::String(s) => Ok(Value::Bytes(s.into_bytes())),
956 Value::Array(items) => Ok(Value::Bytes(
957 items
958 .into_iter()
959 .map(Value::try_u8)
960 .collect::<Result<Vec<_>, _>>()?,
961 )),
962 other => Err(Error::GetBytes(other)),
963 }
964 }
965
966 fn resolve_string(self) -> Result<Self, Error> {
967 match self {
968 Value::String(s) => Ok(Value::String(s)),
969 Value::Bytes(bytes) | Value::Fixed(_, bytes) => Ok(Value::String(
970 String::from_utf8(bytes).map_err(Error::ConvertToUtf8)?,
971 )),
972 other => Err(Error::GetString(other)),
973 }
974 }
975
976 fn resolve_fixed(self, size: usize) -> Result<Self, Error> {
977 match self {
978 Value::Fixed(n, bytes) => {
979 if n == size {
980 Ok(Value::Fixed(n, bytes))
981 } else {
982 Err(Error::CompareFixedSizes { size, n })
983 }
984 }
985 Value::String(s) => Ok(Value::Fixed(s.len(), s.into_bytes())),
986 Value::Bytes(s) => {
987 if s.len() == size {
988 Ok(Value::Fixed(size, s))
989 } else {
990 Err(Error::CompareFixedSizes { size, n: s.len() })
991 }
992 }
993 other => Err(Error::GetStringForFixed(other)),
994 }
995 }
996
997 pub(crate) fn resolve_enum(
998 self,
999 symbols: &[String],
1000 enum_default: &Option<String>,
1001 _field_default: &Option<JsonValue>,
1002 ) -> Result<Self, Error> {
1003 let validate_symbol = |symbol: String, symbols: &[String]| {
1004 if let Some(index) = symbols.iter().position(|item| item == &symbol) {
1005 Ok(Value::Enum(index as u32, symbol))
1006 } else {
1007 match enum_default {
1008 Some(default) => {
1009 if let Some(index) = symbols.iter().position(|item| item == default) {
1010 Ok(Value::Enum(index as u32, default.clone()))
1011 } else {
1012 Err(Error::GetEnumDefault {
1013 symbol,
1014 symbols: symbols.into(),
1015 })
1016 }
1017 }
1018 _ => Err(Error::GetEnumDefault {
1019 symbol,
1020 symbols: symbols.into(),
1021 }),
1022 }
1023 }
1024 };
1025
1026 match self {
1027 Value::Enum(_raw_index, s) => validate_symbol(s, symbols),
1028 Value::String(s) => validate_symbol(s, symbols),
1029 other => Err(Error::GetEnum(other)),
1030 }
1031 }
1032
1033 fn resolve_union<S: Borrow<Schema> + Debug>(
1034 self,
1035 schema: &UnionSchema,
1036 names: &HashMap<Name, S>,
1037 enclosing_namespace: &Namespace,
1038 field_default: &Option<JsonValue>,
1039 ) -> Result<Self, Error> {
1040 let v = match self {
1041 Value::Union(_i, v) => *v,
1043 v => v,
1045 };
1046 let (i, inner) = schema
1047 .find_schema_with_known_schemata(&v, Some(names), enclosing_namespace)
1048 .ok_or(Error::FindUnionVariant)?;
1049
1050 Ok(Value::Union(
1051 i as u32,
1052 Box::new(v.resolve_internal(inner, names, enclosing_namespace, field_default)?),
1053 ))
1054 }
1055
1056 fn resolve_array<S: Borrow<Schema> + Debug>(
1057 self,
1058 schema: &Schema,
1059 names: &HashMap<Name, S>,
1060 enclosing_namespace: &Namespace,
1061 ) -> Result<Self, Error> {
1062 match self {
1063 Value::Array(items) => Ok(Value::Array(
1064 items
1065 .into_iter()
1066 .map(|item| item.resolve_internal(schema, names, enclosing_namespace, &None))
1067 .collect::<Result<_, _>>()?,
1068 )),
1069 other => Err(Error::GetArray {
1070 expected: schema.into(),
1071 other,
1072 }),
1073 }
1074 }
1075
1076 fn resolve_map<S: Borrow<Schema> + Debug>(
1077 self,
1078 schema: &Schema,
1079 names: &HashMap<Name, S>,
1080 enclosing_namespace: &Namespace,
1081 ) -> Result<Self, Error> {
1082 match self {
1083 Value::Map(items) => Ok(Value::Map(
1084 items
1085 .into_iter()
1086 .map(|(key, value)| {
1087 value
1088 .resolve_internal(schema, names, enclosing_namespace, &None)
1089 .map(|value| (key, value))
1090 })
1091 .collect::<Result<_, _>>()?,
1092 )),
1093 other => Err(Error::GetMap {
1094 expected: schema.into(),
1095 other,
1096 }),
1097 }
1098 }
1099
1100 fn resolve_record<S: Borrow<Schema> + Debug>(
1101 self,
1102 fields: &[RecordField],
1103 names: &HashMap<Name, S>,
1104 enclosing_namespace: &Namespace,
1105 ) -> Result<Self, Error> {
1106 let mut items = match self {
1107 Value::Map(items) => Ok(items),
1108 Value::Record(fields) => Ok(fields.into_iter().collect::<HashMap<_, _>>()),
1109 other => Err(Error::GetRecord {
1110 expected: fields
1111 .iter()
1112 .map(|field| (field.name.clone(), field.schema.clone().into()))
1113 .collect(),
1114 other,
1115 }),
1116 }?;
1117
1118 let new_fields = fields
1119 .iter()
1120 .map(|field| {
1121 let value = match items.remove(&field.name) {
1122 Some(value) => value,
1123 None => match field.default {
1124 Some(ref value) => match field.schema {
1125 Schema::Enum(EnumSchema {
1126 ref symbols,
1127 ref default,
1128 ..
1129 }) => Value::from(value.clone()).resolve_enum(
1130 symbols,
1131 default,
1132 &field.default.clone(),
1133 )?,
1134 Schema::Union(ref union_schema) => {
1135 let first = &union_schema.variants()[0];
1136 match first {
1139 Schema::Null => Value::Union(0, Box::new(Value::Null)),
1140 _ => Value::Union(
1141 0,
1142 Box::new(Value::from(value.clone()).resolve_internal(
1143 first,
1144 names,
1145 enclosing_namespace,
1146 &field.default,
1147 )?),
1148 ),
1149 }
1150 }
1151 _ => Value::from(value.clone()),
1152 },
1153 None => {
1154 return Err(Error::GetField(field.name.clone()));
1155 }
1156 },
1157 };
1158 value
1159 .resolve_internal(&field.schema, names, enclosing_namespace, &field.default)
1160 .map(|value| (field.name.clone(), value))
1161 })
1162 .collect::<Result<Vec<_>, _>>()?;
1163
1164 Ok(Value::Record(new_fields))
1165 }
1166
1167 fn try_u8(self) -> AvroResult<u8> {
1168 let int = self.resolve(&Schema::Int)?;
1169 if let Value::Int(n) = int {
1170 if n >= 0 && n <= i32::from(u8::MAX) {
1171 return Ok(n as u8);
1172 }
1173 }
1174
1175 Err(Error::GetU8(int))
1176 }
1177}
1178
1179#[cfg(test)]
1180mod tests {
1181 use super::*;
1182 use crate::{
1183 duration::{Days, Millis, Months},
1184 schema::RecordFieldOrder,
1185 };
1186 use apache_avro_test_helper::{
1187 logger::{assert_logged, assert_not_logged},
1188 TestResult,
1189 };
1190 use num_bigint::BigInt;
1191 use pretty_assertions::assert_eq;
1192 use serde_json::json;
1193
1194 #[test]
1195 fn avro_3809_validate_nested_records_with_implicit_namespace() -> TestResult {
1196 let schema = Schema::parse_str(
1197 r#"{
1198 "name": "record_name",
1199 "namespace": "space",
1200 "type": "record",
1201 "fields": [
1202 {
1203 "name": "outer_field_1",
1204 "type": {
1205 "type": "record",
1206 "name": "middle_record_name",
1207 "namespace": "middle_namespace",
1208 "fields": [
1209 {
1210 "name": "middle_field_1",
1211 "type": {
1212 "type": "record",
1213 "name": "inner_record_name",
1214 "fields": [
1215 { "name": "inner_field_1", "type": "double" }
1216 ]
1217 }
1218 },
1219 { "name": "middle_field_2", "type": "inner_record_name" }
1220 ]
1221 }
1222 }
1223 ]
1224 }"#,
1225 )?;
1226 let value = Value::Record(vec![(
1227 "outer_field_1".into(),
1228 Value::Record(vec![
1229 (
1230 "middle_field_1".into(),
1231 Value::Record(vec![("inner_field_1".into(), Value::Double(1.2f64))]),
1232 ),
1233 (
1234 "middle_field_2".into(),
1235 Value::Record(vec![("inner_field_1".into(), Value::Double(1.6f64))]),
1236 ),
1237 ]),
1238 )]);
1239
1240 assert!(value.validate(&schema));
1241 Ok(())
1242 }
1243
1244 #[test]
1245 fn validate() -> TestResult {
1246 let value_schema_valid = vec![
1247 (Value::Int(42), Schema::Int, true, ""),
1248 (Value::Int(43), Schema::Long, true, ""),
1249 (Value::Float(43.2), Schema::Float, true, ""),
1250 (Value::Float(45.9), Schema::Double, true, ""),
1251 (
1252 Value::Int(42),
1253 Schema::Boolean,
1254 false,
1255 "Invalid value: Int(42) for schema: Boolean. Reason: Unsupported value-schema combination! Value: Int(42), schema: Boolean",
1256 ),
1257 (
1258 Value::Union(0, Box::new(Value::Null)),
1259 Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?),
1260 true,
1261 "",
1262 ),
1263 (
1264 Value::Union(1, Box::new(Value::Int(42))),
1265 Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?),
1266 true,
1267 "",
1268 ),
1269 (
1270 Value::Union(0, Box::new(Value::Null)),
1271 Schema::Union(UnionSchema::new(vec![Schema::Double, Schema::Int])?),
1272 false,
1273 "Invalid value: Union(0, Null) for schema: Union(UnionSchema { schemas: [Double, Int], variant_index: {Int: 1, Double: 0} }). Reason: Unsupported value-schema combination! Value: Null, schema: Double",
1274 ),
1275 (
1276 Value::Union(3, Box::new(Value::Int(42))),
1277 Schema::Union(
1278 UnionSchema::new(vec![
1279 Schema::Null,
1280 Schema::Double,
1281 Schema::String,
1282 Schema::Int,
1283 ])
1284 ?,
1285 ),
1286 true,
1287 "",
1288 ),
1289 (
1290 Value::Union(1, Box::new(Value::Long(42i64))),
1291 Schema::Union(
1292 UnionSchema::new(vec![Schema::Null, Schema::TimestampMillis])?,
1293 ),
1294 true,
1295 "",
1296 ),
1297 (
1298 Value::Union(2, Box::new(Value::Long(1_i64))),
1299 Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?),
1300 false,
1301 "Invalid value: Union(2, Long(1)) for schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }). Reason: No schema in the union at position '2'",
1302 ),
1303 (
1304 Value::Array(vec![Value::Long(42i64)]),
1305 Schema::array(Schema::Long),
1306 true,
1307 "",
1308 ),
1309 (
1310 Value::Array(vec![Value::Boolean(true)]),
1311 Schema::array(Schema::Long),
1312 false,
1313 "Invalid value: Array([Boolean(true)]) for schema: Array(ArraySchema { items: Long, attributes: {} }). Reason: Unsupported value-schema combination! Value: Boolean(true), schema: Long",
1314 ),
1315 (Value::Record(vec![]), Schema::Null, false, "Invalid value: Record([]) for schema: Null. Reason: Unsupported value-schema combination! Value: Record([]), schema: Null"),
1316 (
1317 Value::Fixed(12, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]),
1318 Schema::Duration,
1319 true,
1320 "",
1321 ),
1322 (
1323 Value::Fixed(11, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
1324 Schema::Duration,
1325 false,
1326 "Invalid value: Fixed(11, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) for schema: Duration. Reason: The value's size ('11') must be exactly 12 to be a Duration",
1327 ),
1328 (
1329 Value::Record(vec![("unknown_field_name".to_string(), Value::Null)]),
1330 Schema::Record(RecordSchema {
1331 name: Name::new("record_name").unwrap(),
1332 aliases: None,
1333 doc: None,
1334 fields: vec![RecordField {
1335 name: "field_name".to_string(),
1336 doc: None,
1337 default: None,
1338 aliases: None,
1339 schema: Schema::Int,
1340 order: RecordFieldOrder::Ignore,
1341 position: 0,
1342 custom_attributes: Default::default(),
1343 }],
1344 lookup: Default::default(),
1345 attributes: Default::default(),
1346 }),
1347 false,
1348 r#"Invalid value: Record([("unknown_field_name", Null)]) for schema: Record(RecordSchema { name: Name { name: "record_name", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "field_name", doc: None, aliases: None, default: None, schema: Int, order: Ignore, position: 0, custom_attributes: {} }], lookup: {}, attributes: {} }). Reason: There is no schema field for field 'unknown_field_name'"#,
1349 ),
1350 (
1351 Value::Record(vec![("field_name".to_string(), Value::Null)]),
1352 Schema::Record(RecordSchema {
1353 name: Name::new("record_name").unwrap(),
1354 aliases: None,
1355 doc: None,
1356 fields: vec![RecordField {
1357 name: "field_name".to_string(),
1358 doc: None,
1359 default: None,
1360 aliases: None,
1361 schema: Schema::Ref {
1362 name: Name::new("missing").unwrap(),
1363 },
1364 order: RecordFieldOrder::Ignore,
1365 position: 0,
1366 custom_attributes: Default::default(),
1367 }],
1368 lookup: [("field_name".to_string(), 0)].iter().cloned().collect(),
1369 attributes: Default::default(),
1370 }),
1371 false,
1372 r#"Invalid value: Record([("field_name", Null)]) for schema: Record(RecordSchema { name: Name { name: "record_name", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "field_name", doc: None, aliases: None, default: None, schema: Ref { name: Name { name: "missing", namespace: None } }, order: Ignore, position: 0, custom_attributes: {} }], lookup: {"field_name": 0}, attributes: {} }). Reason: Unresolved schema reference: 'Name { name: "missing", namespace: None }'. Parsed names: []"#,
1373 ),
1374 ];
1375
1376 for (value, schema, valid, expected_err_message) in value_schema_valid.into_iter() {
1377 let err_message =
1378 value.validate_internal::<Schema>(&schema, &HashMap::default(), &None);
1379 assert_eq!(valid, err_message.is_none());
1380 if !valid {
1381 let full_err_message = format!(
1382 "Invalid value: {:?} for schema: {:?}. Reason: {}",
1383 value,
1384 schema,
1385 err_message.unwrap()
1386 );
1387 assert_eq!(expected_err_message, full_err_message);
1388 }
1389 }
1390
1391 Ok(())
1392 }
1393
1394 #[test]
1395 fn validate_fixed() -> TestResult {
1396 let schema = Schema::Fixed(FixedSchema {
1397 size: 4,
1398 name: Name::new("some_fixed").unwrap(),
1399 aliases: None,
1400 doc: None,
1401 default: None,
1402 attributes: Default::default(),
1403 });
1404
1405 assert!(Value::Fixed(4, vec![0, 0, 0, 0]).validate(&schema));
1406 let value = Value::Fixed(5, vec![0, 0, 0, 0, 0]);
1407 assert!(!value.validate(&schema));
1408 assert_logged(
1409 format!(
1410 "Invalid value: {:?} for schema: {:?}. Reason: {}",
1411 value, schema, "The value's size (5) is different than the schema's size (4)"
1412 )
1413 .as_str(),
1414 );
1415
1416 assert!(Value::Bytes(vec![0, 0, 0, 0]).validate(&schema));
1417 let value = Value::Bytes(vec![0, 0, 0, 0, 0]);
1418 assert!(!value.validate(&schema));
1419 assert_logged(
1420 format!(
1421 "Invalid value: {:?} for schema: {:?}. Reason: {}",
1422 value, schema, "The bytes' length (5) is different than the schema's size (4)"
1423 )
1424 .as_str(),
1425 );
1426
1427 Ok(())
1428 }
1429
1430 #[test]
1431 fn validate_enum() -> TestResult {
1432 let schema = Schema::Enum(EnumSchema {
1433 name: Name::new("some_enum").unwrap(),
1434 aliases: None,
1435 doc: None,
1436 symbols: vec![
1437 "spades".to_string(),
1438 "hearts".to_string(),
1439 "diamonds".to_string(),
1440 "clubs".to_string(),
1441 ],
1442 default: None,
1443 attributes: Default::default(),
1444 });
1445
1446 assert!(Value::Enum(0, "spades".to_string()).validate(&schema));
1447 assert!(Value::String("spades".to_string()).validate(&schema));
1448
1449 let value = Value::Enum(1, "spades".to_string());
1450 assert!(!value.validate(&schema));
1451 assert_logged(
1452 format!(
1453 "Invalid value: {:?} for schema: {:?}. Reason: {}",
1454 value, schema, "Symbol 'spades' is not at position '1'"
1455 )
1456 .as_str(),
1457 );
1458
1459 let value = Value::Enum(1000, "spades".to_string());
1460 assert!(!value.validate(&schema));
1461 assert_logged(
1462 format!(
1463 "Invalid value: {:?} for schema: {:?}. Reason: {}",
1464 value, schema, "No symbol at position '1000'"
1465 )
1466 .as_str(),
1467 );
1468
1469 let value = Value::String("lorem".to_string());
1470 assert!(!value.validate(&schema));
1471 assert_logged(
1472 format!(
1473 "Invalid value: {:?} for schema: {:?}. Reason: {}",
1474 value, schema, "'lorem' is not a member of the possible symbols"
1475 )
1476 .as_str(),
1477 );
1478
1479 let other_schema = Schema::Enum(EnumSchema {
1480 name: Name::new("some_other_enum").unwrap(),
1481 aliases: None,
1482 doc: None,
1483 symbols: vec![
1484 "hearts".to_string(),
1485 "diamonds".to_string(),
1486 "clubs".to_string(),
1487 "spades".to_string(),
1488 ],
1489 default: None,
1490 attributes: Default::default(),
1491 });
1492
1493 let value = Value::Enum(0, "spades".to_string());
1494 assert!(!value.validate(&other_schema));
1495 assert_logged(
1496 format!(
1497 "Invalid value: {:?} for schema: {:?}. Reason: {}",
1498 value, other_schema, "Symbol 'spades' is not at position '0'"
1499 )
1500 .as_str(),
1501 );
1502
1503 Ok(())
1504 }
1505
1506 #[test]
1507 fn validate_record() -> TestResult {
1508 let schema = Schema::Record(RecordSchema {
1521 name: Name::new("some_record").unwrap(),
1522 aliases: None,
1523 doc: None,
1524 fields: vec![
1525 RecordField {
1526 name: "a".to_string(),
1527 doc: None,
1528 default: None,
1529 aliases: None,
1530 schema: Schema::Long,
1531 order: RecordFieldOrder::Ascending,
1532 position: 0,
1533 custom_attributes: Default::default(),
1534 },
1535 RecordField {
1536 name: "b".to_string(),
1537 doc: None,
1538 default: None,
1539 aliases: None,
1540 schema: Schema::String,
1541 order: RecordFieldOrder::Ascending,
1542 position: 1,
1543 custom_attributes: Default::default(),
1544 },
1545 RecordField {
1546 name: "c".to_string(),
1547 doc: None,
1548 default: Some(JsonValue::Null),
1549 aliases: None,
1550 schema: Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?),
1551 order: RecordFieldOrder::Ascending,
1552 position: 2,
1553 custom_attributes: Default::default(),
1554 },
1555 ],
1556 lookup: [
1557 ("a".to_string(), 0),
1558 ("b".to_string(), 1),
1559 ("c".to_string(), 2),
1560 ]
1561 .iter()
1562 .cloned()
1563 .collect(),
1564 attributes: Default::default(),
1565 });
1566
1567 assert!(Value::Record(vec![
1568 ("a".to_string(), Value::Long(42i64)),
1569 ("b".to_string(), Value::String("foo".to_string())),
1570 ])
1571 .validate(&schema));
1572
1573 let value = Value::Record(vec![
1574 ("b".to_string(), Value::String("foo".to_string())),
1575 ("a".to_string(), Value::Long(42i64)),
1576 ]);
1577 assert!(value.validate(&schema));
1578
1579 let value = Value::Record(vec![
1580 ("a".to_string(), Value::Boolean(false)),
1581 ("b".to_string(), Value::String("foo".to_string())),
1582 ]);
1583 assert!(!value.validate(&schema));
1584 assert_logged(
1585 r#"Invalid value: Record([("a", Boolean(false)), ("b", String("foo"))]) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: Unsupported value-schema combination! Value: Boolean(false), schema: Long"#,
1586 );
1587
1588 let value = Value::Record(vec![
1589 ("a".to_string(), Value::Long(42i64)),
1590 ("c".to_string(), Value::String("foo".to_string())),
1591 ]);
1592 assert!(!value.validate(&schema));
1593 assert_logged(
1594 r#"Invalid value: Record([("a", Long(42)), ("c", String("foo"))]) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: Could not find matching type in union"#,
1595 );
1596 assert_not_logged(
1597 r#"Invalid value: String("foo") for schema: Int. Reason: Unsupported value-schema combination"#,
1598 );
1599
1600 let value = Value::Record(vec![
1601 ("a".to_string(), Value::Long(42i64)),
1602 ("d".to_string(), Value::String("foo".to_string())),
1603 ]);
1604 assert!(!value.validate(&schema));
1605 assert_logged(
1606 r#"Invalid value: Record([("a", Long(42)), ("d", String("foo"))]) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: There is no schema field for field 'd'"#,
1607 );
1608
1609 let value = Value::Record(vec![
1610 ("a".to_string(), Value::Long(42i64)),
1611 ("b".to_string(), Value::String("foo".to_string())),
1612 ("c".to_string(), Value::Null),
1613 ("d".to_string(), Value::Null),
1614 ]);
1615 assert!(!value.validate(&schema));
1616 assert_logged(
1617 r#"Invalid value: Record([("a", Long(42)), ("b", String("foo")), ("c", Null), ("d", Null)]) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: The value's records length (4) is greater than the schema's (3 fields)"#,
1618 );
1619
1620 assert!(Value::Map(
1621 vec![
1622 ("a".to_string(), Value::Long(42i64)),
1623 ("b".to_string(), Value::String("foo".to_string())),
1624 ]
1625 .into_iter()
1626 .collect()
1627 )
1628 .validate(&schema));
1629
1630 assert!(!Value::Map(
1631 vec![("d".to_string(), Value::Long(123_i64)),]
1632 .into_iter()
1633 .collect()
1634 )
1635 .validate(&schema));
1636 assert_logged(
1637 r#"Invalid value: Map({"d": Long(123)}) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: Field with name '"a"' is not a member of the map items
1638Field with name '"b"' is not a member of the map items"#,
1639 );
1640
1641 let union_schema = Schema::Union(UnionSchema::new(vec![Schema::Null, schema])?);
1642
1643 assert!(Value::Union(
1644 1,
1645 Box::new(Value::Record(vec![
1646 ("a".to_string(), Value::Long(42i64)),
1647 ("b".to_string(), Value::String("foo".to_string())),
1648 ]))
1649 )
1650 .validate(&union_schema));
1651
1652 assert!(Value::Union(
1653 1,
1654 Box::new(Value::Map(
1655 vec![
1656 ("a".to_string(), Value::Long(42i64)),
1657 ("b".to_string(), Value::String("foo".to_string())),
1658 ]
1659 .into_iter()
1660 .collect()
1661 ))
1662 )
1663 .validate(&union_schema));
1664
1665 Ok(())
1666 }
1667
1668 #[test]
1669 fn resolve_bytes_ok() -> TestResult {
1670 let value = Value::Array(vec![Value::Int(0), Value::Int(42)]);
1671 assert_eq!(
1672 value.resolve(&Schema::Bytes)?,
1673 Value::Bytes(vec![0u8, 42u8])
1674 );
1675
1676 Ok(())
1677 }
1678
1679 #[test]
1680 fn resolve_string_from_bytes() -> TestResult {
1681 let value = Value::Bytes(vec![97, 98, 99]);
1682 assert_eq!(
1683 value.resolve(&Schema::String)?,
1684 Value::String("abc".to_string())
1685 );
1686
1687 Ok(())
1688 }
1689
1690 #[test]
1691 fn resolve_string_from_fixed() -> TestResult {
1692 let value = Value::Fixed(3, vec![97, 98, 99]);
1693 assert_eq!(
1694 value.resolve(&Schema::String)?,
1695 Value::String("abc".to_string())
1696 );
1697
1698 Ok(())
1699 }
1700
1701 #[test]
1702 fn resolve_bytes_failure() {
1703 let value = Value::Array(vec![Value::Int(2000), Value::Int(-42)]);
1704 assert!(value.resolve(&Schema::Bytes).is_err());
1705 }
1706
1707 #[test]
1708 fn resolve_decimal_bytes() -> TestResult {
1709 let value = Value::Decimal(Decimal::from(vec![1, 2, 3, 4, 5]));
1710 value.clone().resolve(&Schema::Decimal(DecimalSchema {
1711 precision: 10,
1712 scale: 4,
1713 inner: Box::new(Schema::Bytes),
1714 }))?;
1715 assert!(value.resolve(&Schema::String).is_err());
1716
1717 Ok(())
1718 }
1719
1720 #[test]
1721 fn resolve_decimal_invalid_scale() {
1722 let value = Value::Decimal(Decimal::from(vec![1, 2]));
1723 assert!(value
1724 .resolve(&Schema::Decimal(DecimalSchema {
1725 precision: 2,
1726 scale: 3,
1727 inner: Box::new(Schema::Bytes),
1728 }))
1729 .is_err());
1730 }
1731
1732 #[test]
1733 fn resolve_decimal_invalid_precision_for_length() {
1734 let value = Value::Decimal(Decimal::from((1u8..=8u8).rev().collect::<Vec<_>>()));
1735 assert!(value
1736 .resolve(&Schema::Decimal(DecimalSchema {
1737 precision: 1,
1738 scale: 0,
1739 inner: Box::new(Schema::Bytes),
1740 }))
1741 .is_ok());
1742 }
1743
1744 #[test]
1745 fn resolve_decimal_fixed() {
1746 let value = Value::Decimal(Decimal::from(vec![1, 2, 3, 4, 5]));
1747 assert!(value
1748 .clone()
1749 .resolve(&Schema::Decimal(DecimalSchema {
1750 precision: 10,
1751 scale: 1,
1752 inner: Box::new(Schema::Fixed(FixedSchema {
1753 name: Name::new("decimal").unwrap(),
1754 aliases: None,
1755 size: 20,
1756 doc: None,
1757 default: None,
1758 attributes: Default::default(),
1759 }))
1760 }))
1761 .is_ok());
1762 assert!(value.resolve(&Schema::String).is_err());
1763 }
1764
1765 #[test]
1766 fn resolve_date() {
1767 let value = Value::Date(2345);
1768 assert!(value.clone().resolve(&Schema::Date).is_ok());
1769 assert!(value.resolve(&Schema::String).is_err());
1770 }
1771
1772 #[test]
1773 fn resolve_time_millis() {
1774 let value = Value::TimeMillis(10);
1775 assert!(value.clone().resolve(&Schema::TimeMillis).is_ok());
1776 assert!(value.resolve(&Schema::TimeMicros).is_err());
1777 }
1778
1779 #[test]
1780 fn resolve_time_micros() {
1781 let value = Value::TimeMicros(10);
1782 assert!(value.clone().resolve(&Schema::TimeMicros).is_ok());
1783 assert!(value.resolve(&Schema::TimeMillis).is_err());
1784 }
1785
1786 #[test]
1787 fn resolve_timestamp_millis() {
1788 let value = Value::TimestampMillis(10);
1789 assert!(value.clone().resolve(&Schema::TimestampMillis).is_ok());
1790 assert!(value.resolve(&Schema::Float).is_err());
1791
1792 let value = Value::Float(10.0f32);
1793 assert!(value.resolve(&Schema::TimestampMillis).is_err());
1794 }
1795
1796 #[test]
1797 fn resolve_timestamp_micros() {
1798 let value = Value::TimestampMicros(10);
1799 assert!(value.clone().resolve(&Schema::TimestampMicros).is_ok());
1800 assert!(value.resolve(&Schema::Int).is_err());
1801
1802 let value = Value::Double(10.0);
1803 assert!(value.resolve(&Schema::TimestampMicros).is_err());
1804 }
1805
1806 #[test]
1807 fn test_avro_3914_resolve_timestamp_nanos() {
1808 let value = Value::TimestampNanos(10);
1809 assert!(value.clone().resolve(&Schema::TimestampNanos).is_ok());
1810 assert!(value.resolve(&Schema::Int).is_err());
1811
1812 let value = Value::Double(10.0);
1813 assert!(value.resolve(&Schema::TimestampNanos).is_err());
1814 }
1815
1816 #[test]
1817 fn test_avro_3853_resolve_timestamp_millis() {
1818 let value = Value::LocalTimestampMillis(10);
1819 assert!(value.clone().resolve(&Schema::LocalTimestampMillis).is_ok());
1820 assert!(value.resolve(&Schema::Float).is_err());
1821
1822 let value = Value::Float(10.0f32);
1823 assert!(value.resolve(&Schema::LocalTimestampMillis).is_err());
1824 }
1825
1826 #[test]
1827 fn test_avro_3853_resolve_timestamp_micros() {
1828 let value = Value::LocalTimestampMicros(10);
1829 assert!(value.clone().resolve(&Schema::LocalTimestampMicros).is_ok());
1830 assert!(value.resolve(&Schema::Int).is_err());
1831
1832 let value = Value::Double(10.0);
1833 assert!(value.resolve(&Schema::LocalTimestampMicros).is_err());
1834 }
1835
1836 #[test]
1837 fn test_avro_3916_resolve_timestamp_nanos() {
1838 let value = Value::LocalTimestampNanos(10);
1839 assert!(value.clone().resolve(&Schema::LocalTimestampNanos).is_ok());
1840 assert!(value.resolve(&Schema::Int).is_err());
1841
1842 let value = Value::Double(10.0);
1843 assert!(value.resolve(&Schema::LocalTimestampNanos).is_err());
1844 }
1845
1846 #[test]
1847 fn resolve_duration() {
1848 let value = Value::Duration(Duration::new(
1849 Months::new(10),
1850 Days::new(5),
1851 Millis::new(3000),
1852 ));
1853 assert!(value.clone().resolve(&Schema::Duration).is_ok());
1854 assert!(value.resolve(&Schema::TimestampMicros).is_err());
1855 assert!(Value::Long(1i64).resolve(&Schema::Duration).is_err());
1856 }
1857
1858 #[test]
1859 fn resolve_uuid() -> TestResult {
1860 let value = Value::Uuid(Uuid::parse_str("1481531d-ccc9-46d9-a56f-5b67459c0537")?);
1861 assert!(value.clone().resolve(&Schema::Uuid).is_ok());
1862 assert!(value.resolve(&Schema::TimestampMicros).is_err());
1863
1864 Ok(())
1865 }
1866
1867 #[test]
1868 fn avro_3678_resolve_float_to_double() {
1869 let value = Value::Float(2345.1);
1870 assert!(value.resolve(&Schema::Double).is_ok());
1871 }
1872
1873 #[test]
1874 fn test_avro_3621_resolve_to_nullable_union() -> TestResult {
1875 let schema = Schema::parse_str(
1876 r#"{
1877 "type": "record",
1878 "name": "root",
1879 "fields": [
1880 {
1881 "name": "event",
1882 "type": [
1883 "null",
1884 {
1885 "type": "record",
1886 "name": "event",
1887 "fields": [
1888 {
1889 "name": "amount",
1890 "type": "int"
1891 },
1892 {
1893 "name": "size",
1894 "type": [
1895 "null",
1896 "int"
1897 ],
1898 "default": null
1899 }
1900 ]
1901 }
1902 ],
1903 "default": null
1904 }
1905 ]
1906 }"#,
1907 )?;
1908
1909 let value = Value::Record(vec![(
1910 "event".to_string(),
1911 Value::Record(vec![("amount".to_string(), Value::Int(200))]),
1912 )]);
1913 assert!(value.resolve(&schema).is_ok());
1914
1915 let value = Value::Record(vec![(
1916 "event".to_string(),
1917 Value::Record(vec![("size".to_string(), Value::Int(1))]),
1918 )]);
1919 assert!(value.resolve(&schema).is_err());
1920
1921 Ok(())
1922 }
1923
1924 #[test]
1925 fn json_from_avro() -> TestResult {
1926 assert_eq!(JsonValue::try_from(Value::Null)?, JsonValue::Null);
1927 assert_eq!(
1928 JsonValue::try_from(Value::Boolean(true))?,
1929 JsonValue::Bool(true)
1930 );
1931 assert_eq!(
1932 JsonValue::try_from(Value::Int(1))?,
1933 JsonValue::Number(1.into())
1934 );
1935 assert_eq!(
1936 JsonValue::try_from(Value::Long(1))?,
1937 JsonValue::Number(1.into())
1938 );
1939 assert_eq!(
1940 JsonValue::try_from(Value::Float(1.0))?,
1941 JsonValue::Number(Number::from_f64(1.0).unwrap())
1942 );
1943 assert_eq!(
1944 JsonValue::try_from(Value::Double(1.0))?,
1945 JsonValue::Number(Number::from_f64(1.0).unwrap())
1946 );
1947 assert_eq!(
1948 JsonValue::try_from(Value::Bytes(vec![1, 2, 3]))?,
1949 JsonValue::Array(vec![
1950 JsonValue::Number(1.into()),
1951 JsonValue::Number(2.into()),
1952 JsonValue::Number(3.into())
1953 ])
1954 );
1955 assert_eq!(
1956 JsonValue::try_from(Value::String("test".into()))?,
1957 JsonValue::String("test".into())
1958 );
1959 assert_eq!(
1960 JsonValue::try_from(Value::Fixed(3, vec![1, 2, 3]))?,
1961 JsonValue::Array(vec![
1962 JsonValue::Number(1.into()),
1963 JsonValue::Number(2.into()),
1964 JsonValue::Number(3.into())
1965 ])
1966 );
1967 assert_eq!(
1968 JsonValue::try_from(Value::Enum(1, "test_enum".into()))?,
1969 JsonValue::String("test_enum".into())
1970 );
1971 assert_eq!(
1972 JsonValue::try_from(Value::Union(1, Box::new(Value::String("test_enum".into()))))?,
1973 JsonValue::String("test_enum".into())
1974 );
1975 assert_eq!(
1976 JsonValue::try_from(Value::Array(vec![
1977 Value::Int(1),
1978 Value::Int(2),
1979 Value::Int(3)
1980 ]))?,
1981 JsonValue::Array(vec![
1982 JsonValue::Number(1.into()),
1983 JsonValue::Number(2.into()),
1984 JsonValue::Number(3.into())
1985 ])
1986 );
1987 assert_eq!(
1988 JsonValue::try_from(Value::Map(
1989 vec![
1990 ("v1".to_string(), Value::Int(1)),
1991 ("v2".to_string(), Value::Int(2)),
1992 ("v3".to_string(), Value::Int(3))
1993 ]
1994 .into_iter()
1995 .collect()
1996 ))?,
1997 JsonValue::Object(
1998 vec![
1999 ("v1".to_string(), JsonValue::Number(1.into())),
2000 ("v2".to_string(), JsonValue::Number(2.into())),
2001 ("v3".to_string(), JsonValue::Number(3.into()))
2002 ]
2003 .into_iter()
2004 .collect()
2005 )
2006 );
2007 assert_eq!(
2008 JsonValue::try_from(Value::Record(vec![
2009 ("v1".to_string(), Value::Int(1)),
2010 ("v2".to_string(), Value::Int(2)),
2011 ("v3".to_string(), Value::Int(3))
2012 ]))?,
2013 JsonValue::Object(
2014 vec![
2015 ("v1".to_string(), JsonValue::Number(1.into())),
2016 ("v2".to_string(), JsonValue::Number(2.into())),
2017 ("v3".to_string(), JsonValue::Number(3.into()))
2018 ]
2019 .into_iter()
2020 .collect()
2021 )
2022 );
2023 assert_eq!(
2024 JsonValue::try_from(Value::Date(1))?,
2025 JsonValue::Number(1.into())
2026 );
2027 assert_eq!(
2028 JsonValue::try_from(Value::Decimal(vec![1, 2, 3].into()))?,
2029 JsonValue::Array(vec![
2030 JsonValue::Number(1.into()),
2031 JsonValue::Number(2.into()),
2032 JsonValue::Number(3.into())
2033 ])
2034 );
2035 assert_eq!(
2036 JsonValue::try_from(Value::TimeMillis(1))?,
2037 JsonValue::Number(1.into())
2038 );
2039 assert_eq!(
2040 JsonValue::try_from(Value::TimeMicros(1))?,
2041 JsonValue::Number(1.into())
2042 );
2043 assert_eq!(
2044 JsonValue::try_from(Value::TimestampMillis(1))?,
2045 JsonValue::Number(1.into())
2046 );
2047 assert_eq!(
2048 JsonValue::try_from(Value::TimestampMicros(1))?,
2049 JsonValue::Number(1.into())
2050 );
2051 assert_eq!(
2052 JsonValue::try_from(Value::TimestampNanos(1))?,
2053 JsonValue::Number(1.into())
2054 );
2055 assert_eq!(
2056 JsonValue::try_from(Value::LocalTimestampMillis(1))?,
2057 JsonValue::Number(1.into())
2058 );
2059 assert_eq!(
2060 JsonValue::try_from(Value::LocalTimestampMicros(1))?,
2061 JsonValue::Number(1.into())
2062 );
2063 assert_eq!(
2064 JsonValue::try_from(Value::LocalTimestampNanos(1))?,
2065 JsonValue::Number(1.into())
2066 );
2067 assert_eq!(
2068 JsonValue::try_from(Value::Duration(
2069 [1u8, 2u8, 3u8, 4u8, 5u8, 6u8, 7u8, 8u8, 9u8, 10u8, 11u8, 12u8].into()
2070 ))?,
2071 JsonValue::Array(vec![
2072 JsonValue::Number(1.into()),
2073 JsonValue::Number(2.into()),
2074 JsonValue::Number(3.into()),
2075 JsonValue::Number(4.into()),
2076 JsonValue::Number(5.into()),
2077 JsonValue::Number(6.into()),
2078 JsonValue::Number(7.into()),
2079 JsonValue::Number(8.into()),
2080 JsonValue::Number(9.into()),
2081 JsonValue::Number(10.into()),
2082 JsonValue::Number(11.into()),
2083 JsonValue::Number(12.into()),
2084 ])
2085 );
2086 assert_eq!(
2087 JsonValue::try_from(Value::Uuid(Uuid::parse_str(
2088 "936DA01F-9ABD-4D9D-80C7-02AF85C822A8"
2089 )?))?,
2090 JsonValue::String("936da01f-9abd-4d9d-80c7-02af85c822a8".into())
2091 );
2092
2093 Ok(())
2094 }
2095
2096 #[test]
2097 fn test_avro_3433_recursive_resolves_record() -> TestResult {
2098 let schema = Schema::parse_str(
2099 r#"
2100 {
2101 "type":"record",
2102 "name":"TestStruct",
2103 "fields": [
2104 {
2105 "name":"a",
2106 "type":{
2107 "type":"record",
2108 "name": "Inner",
2109 "fields": [ {
2110 "name":"z",
2111 "type":"int"
2112 }]
2113 }
2114 },
2115 {
2116 "name":"b",
2117 "type":"Inner"
2118 }
2119 ]
2120 }"#,
2121 )?;
2122
2123 let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2124 let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2125 let outer = Value::Record(vec![("a".into(), inner_value1), ("b".into(), inner_value2)]);
2126 outer
2127 .resolve(&schema)
2128 .expect("Record definition defined in one field must be available in other field");
2129
2130 Ok(())
2131 }
2132
2133 #[test]
2134 fn test_avro_3433_recursive_resolves_array() -> TestResult {
2135 let schema = Schema::parse_str(
2136 r#"
2137 {
2138 "type":"record",
2139 "name":"TestStruct",
2140 "fields": [
2141 {
2142 "name":"a",
2143 "type":{
2144 "type":"array",
2145 "items": {
2146 "type":"record",
2147 "name": "Inner",
2148 "fields": [ {
2149 "name":"z",
2150 "type":"int"
2151 }]
2152 }
2153 }
2154 },
2155 {
2156 "name":"b",
2157 "type": {
2158 "type":"map",
2159 "values":"Inner"
2160 }
2161 }
2162 ]
2163 }"#,
2164 )?;
2165
2166 let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2167 let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2168 let outer_value = Value::Record(vec![
2169 ("a".into(), Value::Array(vec![inner_value1])),
2170 (
2171 "b".into(),
2172 Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
2173 ),
2174 ]);
2175 outer_value
2176 .resolve(&schema)
2177 .expect("Record defined in array definition must be resolvable from map");
2178
2179 Ok(())
2180 }
2181
2182 #[test]
2183 fn test_avro_3433_recursive_resolves_map() -> TestResult {
2184 let schema = Schema::parse_str(
2185 r#"
2186 {
2187 "type":"record",
2188 "name":"TestStruct",
2189 "fields": [
2190 {
2191 "name":"a",
2192 "type":{
2193 "type":"record",
2194 "name": "Inner",
2195 "fields": [ {
2196 "name":"z",
2197 "type":"int"
2198 }]
2199 }
2200 },
2201 {
2202 "name":"b",
2203 "type": {
2204 "type":"map",
2205 "values":"Inner"
2206 }
2207 }
2208 ]
2209 }"#,
2210 )?;
2211
2212 let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2213 let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2214 let outer_value = Value::Record(vec![
2215 ("a".into(), inner_value1),
2216 (
2217 "b".into(),
2218 Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
2219 ),
2220 ]);
2221 outer_value
2222 .resolve(&schema)
2223 .expect("Record defined in record field must be resolvable from map field");
2224
2225 Ok(())
2226 }
2227
2228 #[test]
2229 fn test_avro_3433_recursive_resolves_record_wrapper() -> TestResult {
2230 let schema = Schema::parse_str(
2231 r#"
2232 {
2233 "type":"record",
2234 "name":"TestStruct",
2235 "fields": [
2236 {
2237 "name":"a",
2238 "type":{
2239 "type":"record",
2240 "name": "Inner",
2241 "fields": [ {
2242 "name":"z",
2243 "type":"int"
2244 }]
2245 }
2246 },
2247 {
2248 "name":"b",
2249 "type": {
2250 "type":"record",
2251 "name": "InnerWrapper",
2252 "fields": [ {
2253 "name":"j",
2254 "type":"Inner"
2255 }]
2256 }
2257 }
2258 ]
2259 }"#,
2260 )?;
2261
2262 let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2263 let inner_value2 = Value::Record(vec![(
2264 "j".into(),
2265 Value::Record(vec![("z".into(), Value::Int(6))]),
2266 )]);
2267 let outer_value =
2268 Value::Record(vec![("a".into(), inner_value1), ("b".into(), inner_value2)]);
2269 outer_value.resolve(&schema).expect("Record schema defined in field must be resolvable in Record schema defined in other field");
2270
2271 Ok(())
2272 }
2273
2274 #[test]
2275 fn test_avro_3433_recursive_resolves_map_and_array() -> TestResult {
2276 let schema = Schema::parse_str(
2277 r#"
2278 {
2279 "type":"record",
2280 "name":"TestStruct",
2281 "fields": [
2282 {
2283 "name":"a",
2284 "type":{
2285 "type":"map",
2286 "values": {
2287 "type":"record",
2288 "name": "Inner",
2289 "fields": [ {
2290 "name":"z",
2291 "type":"int"
2292 }]
2293 }
2294 }
2295 },
2296 {
2297 "name":"b",
2298 "type": {
2299 "type":"array",
2300 "items":"Inner"
2301 }
2302 }
2303 ]
2304 }"#,
2305 )?;
2306
2307 let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2308 let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2309 let outer_value = Value::Record(vec![
2310 (
2311 "a".into(),
2312 Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
2313 ),
2314 ("b".into(), Value::Array(vec![inner_value1])),
2315 ]);
2316 outer_value
2317 .resolve(&schema)
2318 .expect("Record defined in map definition must be resolvable from array");
2319
2320 Ok(())
2321 }
2322
2323 #[test]
2324 fn test_avro_3433_recursive_resolves_union() -> TestResult {
2325 let schema = Schema::parse_str(
2326 r#"
2327 {
2328 "type":"record",
2329 "name":"TestStruct",
2330 "fields": [
2331 {
2332 "name":"a",
2333 "type":["null", {
2334 "type":"record",
2335 "name": "Inner",
2336 "fields": [ {
2337 "name":"z",
2338 "type":"int"
2339 }]
2340 }]
2341 },
2342 {
2343 "name":"b",
2344 "type":"Inner"
2345 }
2346 ]
2347 }"#,
2348 )?;
2349
2350 let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2351 let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2352 let outer1 = Value::Record(vec![
2353 ("a".into(), inner_value1),
2354 ("b".into(), inner_value2.clone()),
2355 ]);
2356 outer1
2357 .resolve(&schema)
2358 .expect("Record definition defined in union must be resolved in other field");
2359 let outer2 = Value::Record(vec![("a".into(), Value::Null), ("b".into(), inner_value2)]);
2360 outer2
2361 .resolve(&schema)
2362 .expect("Record definition defined in union must be resolved in other field");
2363
2364 Ok(())
2365 }
2366
2367 #[test]
2368 fn test_avro_3461_test_multi_level_resolve_outer_namespace() -> TestResult {
2369 let schema = r#"
2370 {
2371 "name": "record_name",
2372 "namespace": "space",
2373 "type": "record",
2374 "fields": [
2375 {
2376 "name": "outer_field_1",
2377 "type": [
2378 "null",
2379 {
2380 "type": "record",
2381 "name": "middle_record_name",
2382 "fields":[
2383 {
2384 "name":"middle_field_1",
2385 "type":[
2386 "null",
2387 {
2388 "type":"record",
2389 "name":"inner_record_name",
2390 "fields":[
2391 {
2392 "name":"inner_field_1",
2393 "type":"double"
2394 }
2395 ]
2396 }
2397 ]
2398 }
2399 ]
2400 }
2401 ]
2402 },
2403 {
2404 "name": "outer_field_2",
2405 "type" : "space.inner_record_name"
2406 }
2407 ]
2408 }
2409 "#;
2410 let schema = Schema::parse_str(schema)?;
2411 let inner_record = Value::Record(vec![("inner_field_1".into(), Value::Double(5.4))]);
2412 let middle_record_variation_1 = Value::Record(vec![(
2413 "middle_field_1".into(),
2414 Value::Union(0, Box::new(Value::Null)),
2415 )]);
2416 let middle_record_variation_2 = Value::Record(vec![(
2417 "middle_field_1".into(),
2418 Value::Union(1, Box::new(inner_record.clone())),
2419 )]);
2420 let outer_record_variation_1 = Value::Record(vec![
2421 (
2422 "outer_field_1".into(),
2423 Value::Union(0, Box::new(Value::Null)),
2424 ),
2425 ("outer_field_2".into(), inner_record.clone()),
2426 ]);
2427 let outer_record_variation_2 = Value::Record(vec![
2428 (
2429 "outer_field_1".into(),
2430 Value::Union(1, Box::new(middle_record_variation_1)),
2431 ),
2432 ("outer_field_2".into(), inner_record.clone()),
2433 ]);
2434 let outer_record_variation_3 = Value::Record(vec![
2435 (
2436 "outer_field_1".into(),
2437 Value::Union(1, Box::new(middle_record_variation_2)),
2438 ),
2439 ("outer_field_2".into(), inner_record),
2440 ]);
2441
2442 outer_record_variation_1
2443 .resolve(&schema)
2444 .expect("Should be able to resolve value to the schema that is it's definition");
2445 outer_record_variation_2
2446 .resolve(&schema)
2447 .expect("Should be able to resolve value to the schema that is it's definition");
2448 outer_record_variation_3
2449 .resolve(&schema)
2450 .expect("Should be able to resolve value to the schema that is it's definition");
2451
2452 Ok(())
2453 }
2454
2455 #[test]
2456 fn test_avro_3461_test_multi_level_resolve_middle_namespace() -> TestResult {
2457 let schema = r#"
2458 {
2459 "name": "record_name",
2460 "namespace": "space",
2461 "type": "record",
2462 "fields": [
2463 {
2464 "name": "outer_field_1",
2465 "type": [
2466 "null",
2467 {
2468 "type": "record",
2469 "name": "middle_record_name",
2470 "namespace":"middle_namespace",
2471 "fields":[
2472 {
2473 "name":"middle_field_1",
2474 "type":[
2475 "null",
2476 {
2477 "type":"record",
2478 "name":"inner_record_name",
2479 "fields":[
2480 {
2481 "name":"inner_field_1",
2482 "type":"double"
2483 }
2484 ]
2485 }
2486 ]
2487 }
2488 ]
2489 }
2490 ]
2491 },
2492 {
2493 "name": "outer_field_2",
2494 "type" : "middle_namespace.inner_record_name"
2495 }
2496 ]
2497 }
2498 "#;
2499 let schema = Schema::parse_str(schema)?;
2500 let inner_record = Value::Record(vec![("inner_field_1".into(), Value::Double(5.4))]);
2501 let middle_record_variation_1 = Value::Record(vec![(
2502 "middle_field_1".into(),
2503 Value::Union(0, Box::new(Value::Null)),
2504 )]);
2505 let middle_record_variation_2 = Value::Record(vec![(
2506 "middle_field_1".into(),
2507 Value::Union(1, Box::new(inner_record.clone())),
2508 )]);
2509 let outer_record_variation_1 = Value::Record(vec![
2510 (
2511 "outer_field_1".into(),
2512 Value::Union(0, Box::new(Value::Null)),
2513 ),
2514 ("outer_field_2".into(), inner_record.clone()),
2515 ]);
2516 let outer_record_variation_2 = Value::Record(vec![
2517 (
2518 "outer_field_1".into(),
2519 Value::Union(1, Box::new(middle_record_variation_1)),
2520 ),
2521 ("outer_field_2".into(), inner_record.clone()),
2522 ]);
2523 let outer_record_variation_3 = Value::Record(vec![
2524 (
2525 "outer_field_1".into(),
2526 Value::Union(1, Box::new(middle_record_variation_2)),
2527 ),
2528 ("outer_field_2".into(), inner_record),
2529 ]);
2530
2531 outer_record_variation_1
2532 .resolve(&schema)
2533 .expect("Should be able to resolve value to the schema that is it's definition");
2534 outer_record_variation_2
2535 .resolve(&schema)
2536 .expect("Should be able to resolve value to the schema that is it's definition");
2537 outer_record_variation_3
2538 .resolve(&schema)
2539 .expect("Should be able to resolve value to the schema that is it's definition");
2540
2541 Ok(())
2542 }
2543
2544 #[test]
2545 fn test_avro_3461_test_multi_level_resolve_inner_namespace() -> TestResult {
2546 let schema = r#"
2547 {
2548 "name": "record_name",
2549 "namespace": "space",
2550 "type": "record",
2551 "fields": [
2552 {
2553 "name": "outer_field_1",
2554 "type": [
2555 "null",
2556 {
2557 "type": "record",
2558 "name": "middle_record_name",
2559 "namespace":"middle_namespace",
2560 "fields":[
2561 {
2562 "name":"middle_field_1",
2563 "type":[
2564 "null",
2565 {
2566 "type":"record",
2567 "name":"inner_record_name",
2568 "namespace":"inner_namespace",
2569 "fields":[
2570 {
2571 "name":"inner_field_1",
2572 "type":"double"
2573 }
2574 ]
2575 }
2576 ]
2577 }
2578 ]
2579 }
2580 ]
2581 },
2582 {
2583 "name": "outer_field_2",
2584 "type" : "inner_namespace.inner_record_name"
2585 }
2586 ]
2587 }
2588 "#;
2589 let schema = Schema::parse_str(schema)?;
2590
2591 let inner_record = Value::Record(vec![("inner_field_1".into(), Value::Double(5.4))]);
2592 let middle_record_variation_1 = Value::Record(vec![(
2593 "middle_field_1".into(),
2594 Value::Union(0, Box::new(Value::Null)),
2595 )]);
2596 let middle_record_variation_2 = Value::Record(vec![(
2597 "middle_field_1".into(),
2598 Value::Union(1, Box::new(inner_record.clone())),
2599 )]);
2600 let outer_record_variation_1 = Value::Record(vec![
2601 (
2602 "outer_field_1".into(),
2603 Value::Union(0, Box::new(Value::Null)),
2604 ),
2605 ("outer_field_2".into(), inner_record.clone()),
2606 ]);
2607 let outer_record_variation_2 = Value::Record(vec![
2608 (
2609 "outer_field_1".into(),
2610 Value::Union(1, Box::new(middle_record_variation_1)),
2611 ),
2612 ("outer_field_2".into(), inner_record.clone()),
2613 ]);
2614 let outer_record_variation_3 = Value::Record(vec![
2615 (
2616 "outer_field_1".into(),
2617 Value::Union(1, Box::new(middle_record_variation_2)),
2618 ),
2619 ("outer_field_2".into(), inner_record),
2620 ]);
2621
2622 outer_record_variation_1
2623 .resolve(&schema)
2624 .expect("Should be able to resolve value to the schema that is it's definition");
2625 outer_record_variation_2
2626 .resolve(&schema)
2627 .expect("Should be able to resolve value to the schema that is it's definition");
2628 outer_record_variation_3
2629 .resolve(&schema)
2630 .expect("Should be able to resolve value to the schema that is it's definition");
2631
2632 Ok(())
2633 }
2634
2635 #[test]
2636 fn test_avro_3460_validation_with_refs() -> TestResult {
2637 let schema = Schema::parse_str(
2638 r#"
2639 {
2640 "type":"record",
2641 "name":"TestStruct",
2642 "fields": [
2643 {
2644 "name":"a",
2645 "type":{
2646 "type":"record",
2647 "name": "Inner",
2648 "fields": [ {
2649 "name":"z",
2650 "type":"int"
2651 }]
2652 }
2653 },
2654 {
2655 "name":"b",
2656 "type":"Inner"
2657 }
2658 ]
2659 }"#,
2660 )?;
2661
2662 let inner_value_right = Value::Record(vec![("z".into(), Value::Int(3))]);
2663 let inner_value_wrong1 = Value::Record(vec![("z".into(), Value::Null)]);
2664 let inner_value_wrong2 = Value::Record(vec![("a".into(), Value::String("testing".into()))]);
2665 let outer1 = Value::Record(vec![
2666 ("a".into(), inner_value_right.clone()),
2667 ("b".into(), inner_value_wrong1),
2668 ]);
2669
2670 let outer2 = Value::Record(vec![
2671 ("a".into(), inner_value_right),
2672 ("b".into(), inner_value_wrong2),
2673 ]);
2674
2675 assert!(
2676 !outer1.validate(&schema),
2677 "field b record is invalid against the schema"
2678 ); assert!(
2680 !outer2.validate(&schema),
2681 "field b record is invalid against the schema"
2682 ); Ok(())
2685 }
2686
2687 #[test]
2688 fn test_avro_3460_validation_with_refs_real_struct() -> TestResult {
2689 use crate::ser::Serializer;
2690 use serde::Serialize;
2691
2692 #[derive(Serialize, Clone)]
2693 struct TestInner {
2694 z: i32,
2695 }
2696
2697 #[derive(Serialize)]
2698 struct TestRefSchemaStruct1 {
2699 a: TestInner,
2700 b: String, }
2702
2703 #[derive(Serialize)]
2704 struct TestRefSchemaStruct2 {
2705 a: TestInner,
2706 b: i32, }
2708
2709 #[derive(Serialize)]
2710 struct TestRefSchemaStruct3 {
2711 a: TestInner,
2712 b: Option<TestInner>, }
2714
2715 let schema = Schema::parse_str(
2716 r#"
2717 {
2718 "type":"record",
2719 "name":"TestStruct",
2720 "fields": [
2721 {
2722 "name":"a",
2723 "type":{
2724 "type":"record",
2725 "name": "Inner",
2726 "fields": [ {
2727 "name":"z",
2728 "type":"int"
2729 }]
2730 }
2731 },
2732 {
2733 "name":"b",
2734 "type":"Inner"
2735 }
2736 ]
2737 }"#,
2738 )?;
2739
2740 let test_inner = TestInner { z: 3 };
2741 let test_outer1 = TestRefSchemaStruct1 {
2742 a: test_inner.clone(),
2743 b: "testing".into(),
2744 };
2745 let test_outer2 = TestRefSchemaStruct2 {
2746 a: test_inner.clone(),
2747 b: 24,
2748 };
2749 let test_outer3 = TestRefSchemaStruct3 {
2750 a: test_inner,
2751 b: None,
2752 };
2753
2754 let mut ser = Serializer::default();
2755 let test_outer1: Value = test_outer1.serialize(&mut ser)?;
2756 let mut ser = Serializer::default();
2757 let test_outer2: Value = test_outer2.serialize(&mut ser)?;
2758 let mut ser = Serializer::default();
2759 let test_outer3: Value = test_outer3.serialize(&mut ser)?;
2760
2761 assert!(
2762 !test_outer1.validate(&schema),
2763 "field b record is invalid against the schema"
2764 );
2765 assert!(
2766 !test_outer2.validate(&schema),
2767 "field b record is invalid against the schema"
2768 );
2769 assert!(
2770 !test_outer3.validate(&schema),
2771 "field b record is invalid against the schema"
2772 );
2773
2774 Ok(())
2775 }
2776
2777 fn avro_3674_with_or_without_namespace(with_namespace: bool) -> TestResult {
2778 use crate::ser::Serializer;
2779 use serde::Serialize;
2780
2781 let schema_str = r#"
2782 {
2783 "type": "record",
2784 "name": "NamespacedMessage",
2785 [NAMESPACE]
2786 "fields": [
2787 {
2788 "name": "field_a",
2789 "type": {
2790 "type": "record",
2791 "name": "NestedMessage",
2792 "fields": [
2793 {
2794 "name": "enum_a",
2795 "type": {
2796 "type": "enum",
2797 "name": "EnumType",
2798 "symbols": ["SYMBOL_1", "SYMBOL_2"],
2799 "default": "SYMBOL_1"
2800 }
2801 },
2802 {
2803 "name": "enum_b",
2804 "type": "EnumType"
2805 }
2806 ]
2807 }
2808 }
2809 ]
2810 }
2811 "#;
2812 let schema_str = schema_str.replace(
2813 "[NAMESPACE]",
2814 if with_namespace {
2815 r#""namespace": "com.domain","#
2816 } else {
2817 ""
2818 },
2819 );
2820
2821 let schema = Schema::parse_str(&schema_str)?;
2822
2823 #[derive(Serialize)]
2824 enum EnumType {
2825 #[serde(rename = "SYMBOL_1")]
2826 Symbol1,
2827 #[serde(rename = "SYMBOL_2")]
2828 Symbol2,
2829 }
2830
2831 #[derive(Serialize)]
2832 struct FieldA {
2833 enum_a: EnumType,
2834 enum_b: EnumType,
2835 }
2836
2837 #[derive(Serialize)]
2838 struct NamespacedMessage {
2839 field_a: FieldA,
2840 }
2841
2842 let msg = NamespacedMessage {
2843 field_a: FieldA {
2844 enum_a: EnumType::Symbol2,
2845 enum_b: EnumType::Symbol1,
2846 },
2847 };
2848
2849 let mut ser = Serializer::default();
2850 let test_value: Value = msg.serialize(&mut ser)?;
2851 assert!(test_value.validate(&schema), "test_value should validate");
2852 assert!(
2853 test_value.resolve(&schema).is_ok(),
2854 "test_value should resolve"
2855 );
2856
2857 Ok(())
2858 }
2859
2860 #[test]
2861 fn test_avro_3674_validate_no_namespace_resolution() -> TestResult {
2862 avro_3674_with_or_without_namespace(false)
2863 }
2864
2865 #[test]
2866 fn test_avro_3674_validate_with_namespace_resolution() -> TestResult {
2867 avro_3674_with_or_without_namespace(true)
2868 }
2869
2870 fn avro_3688_schema_resolution_panic(set_field_b: bool) -> TestResult {
2871 use crate::ser::Serializer;
2872 use serde::{Deserialize, Serialize};
2873
2874 let schema_str = r#"{
2875 "type": "record",
2876 "name": "Message",
2877 "fields": [
2878 {
2879 "name": "field_a",
2880 "type": [
2881 "null",
2882 {
2883 "name": "Inner",
2884 "type": "record",
2885 "fields": [
2886 {
2887 "name": "inner_a",
2888 "type": "string"
2889 }
2890 ]
2891 }
2892 ],
2893 "default": null
2894 },
2895 {
2896 "name": "field_b",
2897 "type": [
2898 "null",
2899 "Inner"
2900 ],
2901 "default": null
2902 }
2903 ]
2904 }"#;
2905
2906 #[derive(Serialize, Deserialize)]
2907 struct Inner {
2908 inner_a: String,
2909 }
2910
2911 #[derive(Serialize, Deserialize)]
2912 struct Message {
2913 field_a: Option<Inner>,
2914 field_b: Option<Inner>,
2915 }
2916
2917 let schema = Schema::parse_str(schema_str)?;
2918
2919 let msg = Message {
2920 field_a: Some(Inner {
2921 inner_a: "foo".to_string(),
2922 }),
2923 field_b: if set_field_b {
2924 Some(Inner {
2925 inner_a: "bar".to_string(),
2926 })
2927 } else {
2928 None
2929 },
2930 };
2931
2932 let mut ser = Serializer::default();
2933 let test_value: Value = msg.serialize(&mut ser)?;
2934 assert!(test_value.validate(&schema), "test_value should validate");
2935 assert!(
2936 test_value.resolve(&schema).is_ok(),
2937 "test_value should resolve"
2938 );
2939
2940 Ok(())
2941 }
2942
2943 #[test]
2944 fn test_avro_3688_field_b_not_set() -> TestResult {
2945 avro_3688_schema_resolution_panic(false)
2946 }
2947
2948 #[test]
2949 fn test_avro_3688_field_b_set() -> TestResult {
2950 avro_3688_schema_resolution_panic(true)
2951 }
2952
2953 #[test]
2954 fn test_avro_3764_use_resolve_schemata() -> TestResult {
2955 let referenced_schema =
2956 r#"{"name": "enumForReference", "type": "enum", "symbols": ["A", "B"]}"#;
2957 let main_schema = r#"{"name": "recordWithReference", "type": "record", "fields": [{"name": "reference", "type": "enumForReference"}]}"#;
2958
2959 let value: serde_json::Value = serde_json::from_str(
2960 r#"
2961 {
2962 "reference": "A"
2963 }
2964 "#,
2965 )?;
2966
2967 let avro_value = Value::from(value);
2968
2969 let schemas = Schema::parse_list([main_schema, referenced_schema])?;
2970
2971 let main_schema = schemas.first().unwrap();
2972 let schemata: Vec<_> = schemas.iter().skip(1).collect();
2973
2974 let resolve_result = avro_value.clone().resolve_schemata(main_schema, schemata);
2975
2976 assert!(
2977 resolve_result.is_ok(),
2978 "result of resolving with schemata should be ok, got: {resolve_result:?}"
2979 );
2980
2981 let resolve_result = avro_value.resolve(main_schema);
2982 assert!(
2983 resolve_result.is_err(),
2984 "result of resolving without schemata should be err, got: {resolve_result:?}"
2985 );
2986
2987 Ok(())
2988 }
2989
2990 #[test]
2991 fn test_avro_3767_union_resolve_complex_refs() -> TestResult {
2992 let referenced_enum =
2993 r#"{"name": "enumForReference", "type": "enum", "symbols": ["A", "B"]}"#;
2994 let referenced_record = r#"{"name": "recordForReference", "type": "record", "fields": [{"name": "refInRecord", "type": "enumForReference"}]}"#;
2995 let main_schema = r#"{"name": "recordWithReference", "type": "record", "fields": [{"name": "reference", "type": ["null", "recordForReference"]}]}"#;
2996
2997 let value: serde_json::Value = serde_json::from_str(
2998 r#"
2999 {
3000 "reference": {
3001 "refInRecord": "A"
3002 }
3003 }
3004 "#,
3005 )?;
3006
3007 let avro_value = Value::from(value);
3008
3009 let schemata = Schema::parse_list([referenced_enum, referenced_record, main_schema])?;
3010
3011 let main_schema = schemata.last().unwrap();
3012 let other_schemata: Vec<&Schema> = schemata.iter().take(2).collect();
3013
3014 let resolve_result = avro_value.resolve_schemata(main_schema, other_schemata);
3015
3016 assert!(
3017 resolve_result.is_ok(),
3018 "result of resolving with schemata should be ok, got: {resolve_result:?}"
3019 );
3020
3021 assert!(
3022 resolve_result?.validate_schemata(schemata.iter().collect()),
3023 "result of validation with schemata should be true"
3024 );
3025
3026 Ok(())
3027 }
3028
3029 #[test]
3030 fn test_avro_3782_incorrect_decimal_resolving() -> TestResult {
3031 let schema = r#"{"name": "decimalSchema", "logicalType": "decimal", "type": "fixed", "precision": 8, "scale": 0, "size": 8}"#;
3032
3033 let avro_value = Value::Decimal(Decimal::from(
3034 BigInt::from(12345678u32).to_signed_bytes_be(),
3035 ));
3036 let schema = Schema::parse_str(schema)?;
3037 let resolve_result = avro_value.resolve(&schema);
3038 assert!(
3039 resolve_result.is_ok(),
3040 "resolve result must be ok, got: {resolve_result:?}"
3041 );
3042
3043 Ok(())
3044 }
3045
3046 #[test]
3047 fn test_avro_3779_bigdecimal_resolving() -> TestResult {
3048 let schema =
3049 r#"{"name": "bigDecimalSchema", "logicalType": "big-decimal", "type": "bytes" }"#;
3050
3051 let avro_value = Value::BigDecimal(BigDecimal::from(12345678u32));
3052 let schema = Schema::parse_str(schema)?;
3053 let resolve_result: AvroResult<Value> = avro_value.resolve(&schema);
3054 assert!(
3055 resolve_result.is_ok(),
3056 "resolve result must be ok, got: {resolve_result:?}"
3057 );
3058
3059 Ok(())
3060 }
3061
3062 #[test]
3063 fn test_avro_3892_resolve_fixed_from_bytes() -> TestResult {
3064 let value = Value::Bytes(vec![97, 98, 99]);
3065 assert_eq!(
3066 value.resolve(&Schema::Fixed(FixedSchema {
3067 name: "test".into(),
3068 aliases: None,
3069 doc: None,
3070 size: 3,
3071 default: None,
3072 attributes: Default::default()
3073 }))?,
3074 Value::Fixed(3, vec![97, 98, 99])
3075 );
3076
3077 let value = Value::Bytes(vec![97, 99]);
3078 assert!(value
3079 .resolve(&Schema::Fixed(FixedSchema {
3080 name: "test".into(),
3081 aliases: None,
3082 doc: None,
3083 size: 3,
3084 default: None,
3085 attributes: Default::default()
3086 }))
3087 .is_err(),);
3088
3089 let value = Value::Bytes(vec![97, 98, 99, 100]);
3090 assert!(value
3091 .resolve(&Schema::Fixed(FixedSchema {
3092 name: "test".into(),
3093 aliases: None,
3094 doc: None,
3095 size: 3,
3096 default: None,
3097 attributes: Default::default()
3098 }))
3099 .is_err(),);
3100
3101 Ok(())
3102 }
3103
3104 #[test]
3105 fn avro_3928_from_serde_value_to_types_value() {
3106 assert_eq!(Value::from(serde_json::Value::Null), Value::Null);
3107 assert_eq!(Value::from(json!(true)), Value::Boolean(true));
3108 assert_eq!(Value::from(json!(false)), Value::Boolean(false));
3109 assert_eq!(Value::from(json!(0)), Value::Int(0));
3110 assert_eq!(Value::from(json!(i32::MIN)), Value::Int(i32::MIN));
3111 assert_eq!(Value::from(json!(i32::MAX)), Value::Int(i32::MAX));
3112 assert_eq!(
3113 Value::from(json!(i32::MIN as i64 - 1)),
3114 Value::Long(i32::MIN as i64 - 1)
3115 );
3116 assert_eq!(
3117 Value::from(json!(i32::MAX as i64 + 1)),
3118 Value::Long(i32::MAX as i64 + 1)
3119 );
3120 assert_eq!(Value::from(json!(1.23)), Value::Double(1.23));
3121 assert_eq!(Value::from(json!(-1.23)), Value::Double(-1.23));
3122 assert_eq!(Value::from(json!(u64::MIN)), Value::Int(u64::MIN as i32));
3123 assert_eq!(Value::from(json!(u64::MAX)), Value::Long(u64::MAX as i64));
3124 assert_eq!(
3125 Value::from(json!("some text")),
3126 Value::String("some text".into())
3127 );
3128 assert_eq!(
3129 Value::from(json!(["text1", "text2", "text3"])),
3130 Value::Array(vec![
3131 Value::String("text1".into()),
3132 Value::String("text2".into()),
3133 Value::String("text3".into())
3134 ])
3135 );
3136 assert_eq!(
3137 Value::from(json!({"key1": "value1", "key2": "value2"})),
3138 Value::Map(
3139 vec![
3140 ("key1".into(), Value::String("value1".into())),
3141 ("key2".into(), Value::String("value2".into()))
3142 ]
3143 .into_iter()
3144 .collect()
3145 )
3146 );
3147 }
3148
3149 #[test]
3150 fn avro_4024_resolve_double_from_unknown_string_err() -> TestResult {
3151 let schema = Schema::parse_str(r#"{"type": "double"}"#)?;
3152 let value = Value::String("unknown".to_owned());
3153 match value.resolve(&schema) {
3154 Err(err @ Error::GetDouble(_)) => {
3155 assert_eq!(
3156 format!("{err:?}"),
3157 r#"Expected Value::Double, Value::Float, Value::Int, Value::Long or Value::String ("NaN", "INF", "Infinity", "-INF" or "-Infinity"), got: String("unknown")"#
3158 );
3159 }
3160 other => {
3161 panic!("Expected Error::GetDouble, got {other:?}");
3162 }
3163 }
3164 Ok(())
3165 }
3166
3167 #[test]
3168 fn avro_4024_resolve_float_from_unknown_string_err() -> TestResult {
3169 let schema = Schema::parse_str(r#"{"type": "float"}"#)?;
3170 let value = Value::String("unknown".to_owned());
3171 match value.resolve(&schema) {
3172 Err(err @ Error::GetFloat(_)) => {
3173 assert_eq!(
3174 format!("{err:?}"),
3175 r#"Expected Value::Float, Value::Double, Value::Int, Value::Long or Value::String ("NaN", "INF", "Infinity", "-INF" or "-Infinity"), got: String("unknown")"#
3176 );
3177 }
3178 other => {
3179 panic!("Expected Error::GetFloat, got {other:?}");
3180 }
3181 }
3182 Ok(())
3183 }
3184
3185 #[test]
3186 fn avro_4029_resolve_from_unsupported_err() -> TestResult {
3187 let data: Vec<(&str, Value, &str)> = vec!(
3188 (r#"{ "name": "NAME", "type": "int" }"#, Value::Float(123_f32), "Expected Value::Int, got: Float(123.0)"),
3189 (r#"{ "name": "NAME", "type": "fixed", "size": 3 }"#, Value::Float(123_f32), "String expected for fixed, got: Float(123.0)"),
3190 (r#"{ "name": "NAME", "type": "bytes" }"#, Value::Float(123_f32), "Expected Value::Bytes, got: Float(123.0)"),
3191 (r#"{ "name": "NAME", "type": "string", "logicalType": "uuid" }"#, Value::String("abc-1234".into()), "Failed to convert &str to UUID: invalid group count: expected 5, found 2"),
3192 (r#"{ "name": "NAME", "type": "string", "logicalType": "uuid" }"#, Value::Float(123_f32), "Expected Value::Uuid, got: Float(123.0)"),
3193 (r#"{ "name": "NAME", "type": "bytes", "logicalType": "big-decimal" }"#, Value::Float(123_f32), "Expected Value::BigDecimal, got: Float(123.0)"),
3194 (r#"{ "name": "NAME", "type": "fixed", "size": 12, "logicalType": "duration" }"#, Value::Float(123_f32), "Expected Value::Duration or Value::Fixed(12), got: Float(123.0)"),
3195 (r#"{ "name": "NAME", "type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3 }"#, Value::Float(123_f32), "Expected Value::Decimal, Value::Bytes or Value::Fixed, got: Float(123.0)"),
3196 (r#"{ "name": "NAME", "type": "bytes" }"#, Value::Array(vec!(Value::Long(256_i64))), "Unable to convert to u8, got Int(256)"),
3197 (r#"{ "name": "NAME", "type": "int", "logicalType": "date" }"#, Value::Float(123_f32), "Expected Value::Date or Value::Int, got: Float(123.0)"),
3198 (r#"{ "name": "NAME", "type": "int", "logicalType": "time-millis" }"#, Value::Float(123_f32), "Expected Value::TimeMillis or Value::Int, got: Float(123.0)"),
3199 (r#"{ "name": "NAME", "type": "long", "logicalType": "time-micros" }"#, Value::Float(123_f32), "Expected Value::TimeMicros, Value::Long or Value::Int, got: Float(123.0)"),
3200 (r#"{ "name": "NAME", "type": "long", "logicalType": "timestamp-millis" }"#, Value::Float(123_f32), "Expected Value::TimestampMillis, Value::Long or Value::Int, got: Float(123.0)"),
3201 (r#"{ "name": "NAME", "type": "long", "logicalType": "timestamp-micros" }"#, Value::Float(123_f32), "Expected Value::TimestampMicros, Value::Long or Value::Int, got: Float(123.0)"),
3202 (r#"{ "name": "NAME", "type": "long", "logicalType": "timestamp-nanos" }"#, Value::Float(123_f32), "Expected Value::TimestampNanos, Value::Long or Value::Int, got: Float(123.0)"),
3203 (r#"{ "name": "NAME", "type": "long", "logicalType": "local-timestamp-millis" }"#, Value::Float(123_f32), "Expected Value::LocalTimestampMillis, Value::Long or Value::Int, got: Float(123.0)"),
3204 (r#"{ "name": "NAME", "type": "long", "logicalType": "local-timestamp-micros" }"#, Value::Float(123_f32), "Expected Value::LocalTimestampMicros, Value::Long or Value::Int, got: Float(123.0)"),
3205 (r#"{ "name": "NAME", "type": "long", "logicalType": "local-timestamp-nanos" }"#, Value::Float(123_f32), "Expected Value::LocalTimestampNanos, Value::Long or Value::Int, got: Float(123.0)"),
3206 (r#"{ "name": "NAME", "type": "null" }"#, Value::Float(123_f32), "Expected Value::Null, got: Float(123.0)"),
3207 (r#"{ "name": "NAME", "type": "boolean" }"#, Value::Float(123_f32), "Expected Value::Boolean, got: Float(123.0)"),
3208 (r#"{ "name": "NAME", "type": "int" }"#, Value::Float(123_f32), "Expected Value::Int, got: Float(123.0)"),
3209 (r#"{ "name": "NAME", "type": "long" }"#, Value::Float(123_f32), "Expected Value::Long or Value::Int, got: Float(123.0)"),
3210 (r#"{ "name": "NAME", "type": "float" }"#, Value::Boolean(false), r#"Expected Value::Float, Value::Double, Value::Int, Value::Long or Value::String ("NaN", "INF", "Infinity", "-INF" or "-Infinity"), got: Boolean(false)"#),
3211 (r#"{ "name": "NAME", "type": "double" }"#, Value::Boolean(false), r#"Expected Value::Double, Value::Float, Value::Int, Value::Long or Value::String ("NaN", "INF", "Infinity", "-INF" or "-Infinity"), got: Boolean(false)"#),
3212 (r#"{ "name": "NAME", "type": "string" }"#, Value::Boolean(false), "Expected Value::String, Value::Bytes or Value::Fixed, got: Boolean(false)"),
3213 (r#"{ "name": "NAME", "type": "enum", "symbols": ["one", "two"] }"#, Value::Boolean(false), "Expected Value::Enum, got: Boolean(false)"),
3214 );
3215
3216 for (schema_str, value, expected_error) in data {
3217 let schema = Schema::parse_str(schema_str)?;
3218 match value.resolve(&schema) {
3219 Err(error) => {
3220 assert_eq!(format!("{error}"), expected_error);
3221 }
3222 other => {
3223 panic!("Expected '{expected_error}', got {other:?}");
3224 }
3225 }
3226 }
3227 Ok(())
3228 }
3229
3230 #[test]
3231 fn avro_rs_130_get_from_record() -> TestResult {
3232 let schema = r#"
3233 {
3234 "type": "record",
3235 "name": "NamespacedMessage",
3236 "namespace": "space",
3237 "fields": [
3238 {
3239 "name": "foo",
3240 "type": "string"
3241 },
3242 {
3243 "name": "bar",
3244 "type": "long"
3245 }
3246 ]
3247 }
3248 "#;
3249
3250 let schema = Schema::parse_str(schema)?;
3251 let mut record = Record::new(&schema).unwrap();
3252 record.put("foo", "hello");
3253 record.put("bar", 123_i64);
3254
3255 assert_eq!(
3256 record.get("foo").unwrap(),
3257 &Value::String("hello".to_string())
3258 );
3259 assert_eq!(record.get("bar").unwrap(), &Value::Long(123));
3260
3261 assert_eq!(record.get("baz"), None);
3263
3264 Ok(())
3265 }
3266}