1#[cfg(feature = "canonical_extension_types")]
21use arrow_schema::extension::ExtensionType;
22use arrow_schema::{
23 ArrowError, DataType, Field as ArrowField, IntervalUnit, Schema as ArrowSchema, TimeUnit,
24 UnionMode,
25};
26use serde::{Deserialize, Serialize};
27use serde_json::{Map as JsonMap, Value, json};
28#[cfg(feature = "sha256")]
29use sha2::{Digest, Sha256};
30use std::borrow::Cow;
31use std::cmp::PartialEq;
32use std::collections::hash_map::Entry;
33use std::collections::{HashMap, HashSet};
34use strum_macros::AsRefStr;
35
36pub const SINGLE_OBJECT_MAGIC: [u8; 2] = [0xC3, 0x01];
38
39pub const CONFLUENT_MAGIC: [u8; 1] = [0x00];
41
42pub const MAX_PREFIX_LEN: usize = 34;
45
46pub const SCHEMA_METADATA_KEY: &str = "avro.schema";
48
49pub const AVRO_ENUM_SYMBOLS_METADATA_KEY: &str = "avro.enum.symbols";
51
52pub const AVRO_FIELD_DEFAULT_METADATA_KEY: &str = "avro.field.default";
54
55pub const AVRO_NAME_METADATA_KEY: &str = "avro.name";
57
58pub const AVRO_NAMESPACE_METADATA_KEY: &str = "avro.namespace";
60
61pub const AVRO_DOC_METADATA_KEY: &str = "avro.doc";
63
64pub const AVRO_ROOT_RECORD_DEFAULT_NAME: &str = "topLevelRecord";
66
67#[derive(Debug, Copy, Clone, PartialEq, Default)]
73pub(crate) enum Nullability {
74 #[default]
76 NullFirst,
77 NullSecond,
79}
80
81#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
85#[serde(untagged)]
86pub(crate) enum TypeName<'a> {
90 Primitive(PrimitiveType),
92 Ref(&'a str),
94}
95
96#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, AsRefStr)]
100#[serde(rename_all = "camelCase")]
101#[strum(serialize_all = "lowercase")]
102pub(crate) enum PrimitiveType {
103 Null,
105 Boolean,
107 Int,
109 Long,
111 Float,
113 Double,
115 Bytes,
117 String,
119}
120
121#[derive(Debug, Clone, PartialEq, Eq, Default, Deserialize, Serialize)]
125#[serde(rename_all = "camelCase")]
126pub(crate) struct Attributes<'a> {
127 #[serde(default)]
131 pub(crate) logical_type: Option<&'a str>,
132
133 #[serde(flatten)]
135 pub(crate) additional: HashMap<&'a str, Value>,
136}
137
138impl Attributes<'_> {
139 pub(crate) fn field_metadata(&self) -> HashMap<String, String> {
141 self.additional
142 .iter()
143 .map(|(k, v)| (k.to_string(), v.to_string()))
144 .collect()
145 }
146}
147
148#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
150#[serde(rename_all = "camelCase")]
151pub(crate) struct Type<'a> {
152 #[serde(borrow)]
154 pub(crate) r#type: TypeName<'a>,
155 #[serde(flatten)]
157 pub(crate) attributes: Attributes<'a>,
158}
159
160#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
165#[serde(untagged)]
166pub(crate) enum Schema<'a> {
167 #[serde(borrow)]
169 TypeName(TypeName<'a>),
170 #[serde(borrow)]
172 Union(Vec<Schema<'a>>),
173 #[serde(borrow)]
175 Complex(ComplexType<'a>),
176 #[serde(borrow)]
178 Type(Type<'a>),
179}
180
181#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
185#[serde(tag = "type", rename_all = "camelCase")]
186pub(crate) enum ComplexType<'a> {
187 #[serde(borrow)]
189 Record(Record<'a>),
190 #[serde(borrow)]
192 Enum(Enum<'a>),
193 #[serde(borrow)]
195 Array(Array<'a>),
196 #[serde(borrow)]
198 Map(Map<'a>),
199 #[serde(borrow)]
201 Fixed(Fixed<'a>),
202}
203
204#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
208pub(crate) struct Record<'a> {
209 #[serde(borrow)]
211 pub(crate) name: &'a str,
212 #[serde(borrow, default)]
214 pub(crate) namespace: Option<&'a str>,
215 #[serde(borrow, default)]
217 pub(crate) doc: Option<Cow<'a, str>>,
218 #[serde(borrow, default)]
220 pub(crate) aliases: Vec<&'a str>,
221 #[serde(borrow)]
223 pub(crate) fields: Vec<Field<'a>>,
224 #[serde(flatten)]
226 pub(crate) attributes: Attributes<'a>,
227}
228
229fn deserialize_default<'de, D>(deserializer: D) -> Result<Option<Value>, D::Error>
230where
231 D: serde::Deserializer<'de>,
232{
233 Value::deserialize(deserializer).map(Some)
234}
235
236#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
238pub(crate) struct Field<'a> {
239 #[serde(borrow)]
241 pub(crate) name: &'a str,
242 #[serde(borrow, default)]
244 pub(crate) doc: Option<Cow<'a, str>>,
245 #[serde(borrow)]
247 pub(crate) r#type: Schema<'a>,
248 #[serde(deserialize_with = "deserialize_default", default)]
250 pub(crate) default: Option<Value>,
251 #[serde(borrow, default)]
254 pub(crate) aliases: Vec<&'a str>,
255}
256
257#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
261pub(crate) struct Enum<'a> {
262 #[serde(borrow)]
264 pub(crate) name: &'a str,
265 #[serde(borrow, default)]
267 pub(crate) namespace: Option<&'a str>,
268 #[serde(borrow, default)]
270 pub(crate) doc: Option<Cow<'a, str>>,
271 #[serde(borrow, default)]
273 pub(crate) aliases: Vec<&'a str>,
274 #[serde(borrow)]
276 pub(crate) symbols: Vec<&'a str>,
277 #[serde(borrow, default)]
279 pub(crate) default: Option<&'a str>,
280 #[serde(flatten)]
282 pub(crate) attributes: Attributes<'a>,
283}
284
285#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
289pub(crate) struct Array<'a> {
290 #[serde(borrow)]
292 pub(crate) items: Box<Schema<'a>>,
293 #[serde(flatten)]
295 pub(crate) attributes: Attributes<'a>,
296}
297
298#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
302pub(crate) struct Map<'a> {
303 #[serde(borrow)]
305 pub(crate) values: Box<Schema<'a>>,
306 #[serde(flatten)]
308 pub(crate) attributes: Attributes<'a>,
309}
310
311#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
315pub(crate) struct Fixed<'a> {
316 #[serde(borrow)]
318 pub(crate) name: &'a str,
319 #[serde(borrow, default)]
321 pub(crate) namespace: Option<&'a str>,
322 #[serde(borrow, default)]
324 pub(crate) aliases: Vec<&'a str>,
325 pub(crate) size: usize,
327 #[serde(flatten)]
329 pub(crate) attributes: Attributes<'a>,
330}
331
332#[derive(Debug, Copy, Clone, PartialEq, Default)]
333pub(crate) struct AvroSchemaOptions {
334 pub(crate) null_order: Option<Nullability>,
335 pub(crate) strip_metadata: bool,
336}
337
338#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
340pub struct AvroSchema {
341 pub json_string: String,
343}
344
345impl TryFrom<&ArrowSchema> for AvroSchema {
346 type Error = ArrowError;
347
348 fn try_from(schema: &ArrowSchema) -> Result<Self, Self::Error> {
352 AvroSchema::from_arrow_with_options(schema, None)
353 }
354}
355
356impl AvroSchema {
357 pub fn new(json_string: String) -> Self {
359 Self { json_string }
360 }
361
362 pub(crate) fn schema(&self) -> Result<Schema<'_>, ArrowError> {
363 serde_json::from_str(self.json_string.as_str())
364 .map_err(|e| ArrowError::ParseError(format!("Invalid Avro schema JSON: {e}")))
365 }
366
367 pub fn fingerprint(&self, hash_type: FingerprintAlgorithm) -> Result<Fingerprint, ArrowError> {
395 Self::generate_fingerprint(&self.schema()?, hash_type)
396 }
397
398 pub(crate) fn project(&self, projection: &[usize]) -> Result<Self, ArrowError> {
399 let mut value: Value = serde_json::from_str(&self.json_string)
400 .map_err(|e| ArrowError::AvroError(format!("Invalid Avro schema JSON: {e}")))?;
401 let obj = value.as_object_mut().ok_or_else(|| {
402 ArrowError::AvroError(
403 "Projected schema must be a JSON object Avro record schema".to_string(),
404 )
405 })?;
406 match obj.get("type").and_then(|v| v.as_str()) {
407 Some("record") => {}
408 Some(other) => {
409 return Err(ArrowError::AvroError(format!(
410 "Projected schema must be an Avro record, found type '{other}'"
411 )));
412 }
413 None => {
414 return Err(ArrowError::AvroError(
415 "Projected schema missing required 'type' field".to_string(),
416 ));
417 }
418 }
419 let fields_val = obj.get_mut("fields").ok_or_else(|| {
420 ArrowError::AvroError("Avro record schema missing required 'fields'".to_string())
421 })?;
422 let projected_fields = {
423 let mut original_fields = match fields_val {
424 Value::Array(arr) => std::mem::take(arr),
425 _ => {
426 return Err(ArrowError::AvroError(
427 "Avro record schema 'fields' must be an array".to_string(),
428 ));
429 }
430 };
431 let len = original_fields.len();
432 let mut seen: HashSet<usize> = HashSet::with_capacity(projection.len());
433 let mut out: Vec<Value> = Vec::with_capacity(projection.len());
434 for &i in projection {
435 if i >= len {
436 return Err(ArrowError::AvroError(format!(
437 "Projection index {i} out of bounds for record with {len} fields"
438 )));
439 }
440 if !seen.insert(i) {
441 return Err(ArrowError::AvroError(format!(
442 "Duplicate projection index {i}"
443 )));
444 }
445 out.push(std::mem::replace(&mut original_fields[i], Value::Null));
446 }
447 out
448 };
449 *fields_val = Value::Array(projected_fields);
450 let json_string = serde_json::to_string(&value).map_err(|e| {
451 ArrowError::AvroError(format!(
452 "Failed to serialize projected Avro schema JSON: {e}"
453 ))
454 })?;
455 Ok(Self::new(json_string))
456 }
457
458 pub(crate) fn generate_fingerprint(
459 schema: &Schema,
460 hash_type: FingerprintAlgorithm,
461 ) -> Result<Fingerprint, ArrowError> {
462 let canonical = Self::generate_canonical_form(schema).map_err(|e| {
463 ArrowError::ComputeError(format!("Failed to generate canonical form for schema: {e}"))
464 })?;
465 match hash_type {
466 FingerprintAlgorithm::Rabin => {
467 Ok(Fingerprint::Rabin(compute_fingerprint_rabin(&canonical)))
468 }
469 FingerprintAlgorithm::Id | FingerprintAlgorithm::Id64 => Err(ArrowError::SchemaError(
470 "FingerprintAlgorithm of Id or Id64 cannot be used to generate a fingerprint; \
471 if using Fingerprint::Id, pass the registry ID in instead using the set method."
472 .to_string(),
473 )),
474 #[cfg(feature = "md5")]
475 FingerprintAlgorithm::MD5 => Ok(Fingerprint::MD5(compute_fingerprint_md5(&canonical))),
476 #[cfg(feature = "sha256")]
477 FingerprintAlgorithm::SHA256 => {
478 Ok(Fingerprint::SHA256(compute_fingerprint_sha256(&canonical)))
479 }
480 }
481 }
482
483 pub(crate) fn generate_canonical_form(schema: &Schema) -> Result<String, ArrowError> {
494 build_canonical(schema, None)
495 }
496
497 pub(crate) fn from_arrow_with_options(
504 schema: &ArrowSchema,
505 options: Option<AvroSchemaOptions>,
506 ) -> Result<AvroSchema, ArrowError> {
507 let opts = options.unwrap_or_default();
508 let order = opts.null_order.unwrap_or_default();
509 let strip = opts.strip_metadata;
510 if !strip {
511 if let Some(json) = schema.metadata.get(SCHEMA_METADATA_KEY) {
512 return Ok(AvroSchema::new(json.clone()));
513 }
514 }
515 let mut name_gen = NameGenerator::default();
516 let fields_json = schema
517 .fields()
518 .iter()
519 .map(|f| arrow_field_to_avro(f, &mut name_gen, order, strip))
520 .collect::<Result<Vec<_>, _>>()?;
521 let record_name = schema
522 .metadata
523 .get(AVRO_NAME_METADATA_KEY)
524 .map_or(AVRO_ROOT_RECORD_DEFAULT_NAME, |s| s.as_str());
525 let mut record = JsonMap::with_capacity(schema.metadata.len() + 4);
526 record.insert("type".into(), Value::String("record".into()));
527 record.insert(
528 "name".into(),
529 Value::String(sanitise_avro_name(record_name)),
530 );
531 if let Some(ns) = schema.metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
532 record.insert("namespace".into(), Value::String(ns.clone()));
533 }
534 if let Some(doc) = schema.metadata.get(AVRO_DOC_METADATA_KEY) {
535 record.insert("doc".into(), Value::String(doc.clone()));
536 }
537 record.insert("fields".into(), Value::Array(fields_json));
538 extend_with_passthrough_metadata(&mut record, &schema.metadata);
539 let json_string = serde_json::to_string(&Value::Object(record))
540 .map_err(|e| ArrowError::SchemaError(format!("Serializing Avro JSON failed: {e}")))?;
541 Ok(AvroSchema::new(json_string))
542 }
543}
544
545#[derive(Debug, Copy, Clone)]
547pub(crate) struct Prefix {
548 buf: [u8; MAX_PREFIX_LEN],
549 len: u8,
550}
551
552impl Prefix {
553 #[inline]
554 pub(crate) fn as_slice(&self) -> &[u8] {
555 &self.buf[..self.len as usize]
556 }
557}
558
559#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
561pub enum FingerprintStrategy {
562 #[default]
564 Rabin,
565 Id(u32),
567 Id64(u64),
569 #[cfg(feature = "md5")]
570 MD5,
572 #[cfg(feature = "sha256")]
573 SHA256,
575}
576
577impl From<Fingerprint> for FingerprintStrategy {
578 fn from(f: Fingerprint) -> Self {
579 Self::from(&f)
580 }
581}
582
583impl From<FingerprintAlgorithm> for FingerprintStrategy {
584 fn from(f: FingerprintAlgorithm) -> Self {
585 match f {
586 FingerprintAlgorithm::Rabin => FingerprintStrategy::Rabin,
587 FingerprintAlgorithm::Id => FingerprintStrategy::Id(0),
588 FingerprintAlgorithm::Id64 => FingerprintStrategy::Id64(0),
589 #[cfg(feature = "md5")]
590 FingerprintAlgorithm::MD5 => FingerprintStrategy::MD5,
591 #[cfg(feature = "sha256")]
592 FingerprintAlgorithm::SHA256 => FingerprintStrategy::SHA256,
593 }
594 }
595}
596
597impl From<&Fingerprint> for FingerprintStrategy {
598 fn from(f: &Fingerprint) -> Self {
599 match f {
600 Fingerprint::Rabin(_) => FingerprintStrategy::Rabin,
601 Fingerprint::Id(_) => FingerprintStrategy::Id(0),
602 Fingerprint::Id64(_) => FingerprintStrategy::Id64(0),
603 #[cfg(feature = "md5")]
604 Fingerprint::MD5(_) => FingerprintStrategy::MD5,
605 #[cfg(feature = "sha256")]
606 Fingerprint::SHA256(_) => FingerprintStrategy::SHA256,
607 }
608 }
609}
610
611#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)]
614pub enum FingerprintAlgorithm {
615 #[default]
617 Rabin,
618 Id,
620 Id64,
622 #[cfg(feature = "md5")]
623 MD5,
625 #[cfg(feature = "sha256")]
626 SHA256,
628}
629
630impl From<&Fingerprint> for FingerprintAlgorithm {
632 fn from(fp: &Fingerprint) -> Self {
633 match fp {
634 Fingerprint::Rabin(_) => FingerprintAlgorithm::Rabin,
635 Fingerprint::Id(_) => FingerprintAlgorithm::Id,
636 Fingerprint::Id64(_) => FingerprintAlgorithm::Id64,
637 #[cfg(feature = "md5")]
638 Fingerprint::MD5(_) => FingerprintAlgorithm::MD5,
639 #[cfg(feature = "sha256")]
640 Fingerprint::SHA256(_) => FingerprintAlgorithm::SHA256,
641 }
642 }
643}
644
645impl From<FingerprintStrategy> for FingerprintAlgorithm {
646 fn from(s: FingerprintStrategy) -> Self {
647 Self::from(&s)
648 }
649}
650
651impl From<&FingerprintStrategy> for FingerprintAlgorithm {
652 fn from(s: &FingerprintStrategy) -> Self {
653 match s {
654 FingerprintStrategy::Rabin => FingerprintAlgorithm::Rabin,
655 FingerprintStrategy::Id(_) => FingerprintAlgorithm::Id,
656 FingerprintStrategy::Id64(_) => FingerprintAlgorithm::Id64,
657 #[cfg(feature = "md5")]
658 FingerprintStrategy::MD5 => FingerprintAlgorithm::MD5,
659 #[cfg(feature = "sha256")]
660 FingerprintStrategy::SHA256 => FingerprintAlgorithm::SHA256,
661 }
662 }
663}
664
665#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
674pub enum Fingerprint {
675 Rabin(u64),
677 Id(u32),
679 Id64(u64),
681 #[cfg(feature = "md5")]
682 MD5([u8; 16]),
684 #[cfg(feature = "sha256")]
685 SHA256([u8; 32]),
687}
688
689impl From<FingerprintStrategy> for Fingerprint {
690 fn from(s: FingerprintStrategy) -> Self {
691 Self::from(&s)
692 }
693}
694
695impl From<&FingerprintStrategy> for Fingerprint {
696 fn from(s: &FingerprintStrategy) -> Self {
697 match s {
698 FingerprintStrategy::Rabin => Fingerprint::Rabin(0),
699 FingerprintStrategy::Id(id) => Fingerprint::Id(*id),
700 FingerprintStrategy::Id64(id) => Fingerprint::Id64(*id),
701 #[cfg(feature = "md5")]
702 FingerprintStrategy::MD5 => Fingerprint::MD5([0; 16]),
703 #[cfg(feature = "sha256")]
704 FingerprintStrategy::SHA256 => Fingerprint::SHA256([0; 32]),
705 }
706 }
707}
708
709impl From<FingerprintAlgorithm> for Fingerprint {
710 fn from(s: FingerprintAlgorithm) -> Self {
711 match s {
712 FingerprintAlgorithm::Rabin => Fingerprint::Rabin(0),
713 FingerprintAlgorithm::Id => Fingerprint::Id(0),
714 FingerprintAlgorithm::Id64 => Fingerprint::Id64(0),
715 #[cfg(feature = "md5")]
716 FingerprintAlgorithm::MD5 => Fingerprint::MD5([0; 16]),
717 #[cfg(feature = "sha256")]
718 FingerprintAlgorithm::SHA256 => Fingerprint::SHA256([0; 32]),
719 }
720 }
721}
722
723impl Fingerprint {
724 pub fn load_fingerprint_id(id: u32) -> Self {
732 Fingerprint::Id(u32::from_be(id))
733 }
734
735 pub fn load_fingerprint_id64(id: u64) -> Self {
743 Fingerprint::Id64(u64::from_be(id))
744 }
745
746 pub(crate) fn make_prefix(&self) -> Prefix {
770 let mut buf = [0u8; MAX_PREFIX_LEN];
771 let len = match self {
772 Self::Id(val) => write_prefix(&mut buf, &CONFLUENT_MAGIC, &val.to_be_bytes()),
773 Self::Id64(val) => write_prefix(&mut buf, &CONFLUENT_MAGIC, &val.to_be_bytes()),
774 Self::Rabin(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, &val.to_le_bytes()),
775 #[cfg(feature = "md5")]
776 Self::MD5(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, val),
777 #[cfg(feature = "sha256")]
778 Self::SHA256(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, val),
779 };
780 Prefix { buf, len }
781 }
782}
783
784fn write_prefix<const MAGIC_LEN: usize, const PAYLOAD_LEN: usize>(
785 buf: &mut [u8; MAX_PREFIX_LEN],
786 magic: &[u8; MAGIC_LEN],
787 payload: &[u8; PAYLOAD_LEN],
788) -> u8 {
789 debug_assert!(MAGIC_LEN + PAYLOAD_LEN <= MAX_PREFIX_LEN);
790 let total = MAGIC_LEN + PAYLOAD_LEN;
791 let prefix_slice = &mut buf[..total];
792 prefix_slice[..MAGIC_LEN].copy_from_slice(magic);
793 prefix_slice[MAGIC_LEN..total].copy_from_slice(payload);
794 total as u8
795}
796
797#[derive(Debug, Clone, Default)]
824pub struct SchemaStore {
825 fingerprint_algorithm: FingerprintAlgorithm,
827 schemas: HashMap<Fingerprint, AvroSchema>,
829}
830
831impl TryFrom<HashMap<Fingerprint, AvroSchema>> for SchemaStore {
832 type Error = ArrowError;
833
834 fn try_from(schemas: HashMap<Fingerprint, AvroSchema>) -> Result<Self, Self::Error> {
837 Ok(Self {
838 schemas,
839 ..Self::default()
840 })
841 }
842}
843
844impl SchemaStore {
845 pub fn new() -> Self {
847 Self::default()
848 }
849
850 pub fn new_with_type(fingerprint_algorithm: FingerprintAlgorithm) -> Self {
852 Self {
853 fingerprint_algorithm,
854 ..Self::default()
855 }
856 }
857
858 pub fn set(
875 &mut self,
876 fingerprint: Fingerprint,
877 schema: AvroSchema,
878 ) -> Result<Fingerprint, ArrowError> {
879 match self.schemas.entry(fingerprint) {
880 Entry::Occupied(entry) => {
881 if entry.get() != &schema {
882 return Err(ArrowError::ComputeError(format!(
883 "Schema fingerprint collision detected for fingerprint {fingerprint:?}"
884 )));
885 }
886 }
887 Entry::Vacant(entry) => {
888 entry.insert(schema);
889 }
890 }
891 Ok(fingerprint)
892 }
893
894 pub fn register(&mut self, schema: AvroSchema) -> Result<Fingerprint, ArrowError> {
912 if self.fingerprint_algorithm == FingerprintAlgorithm::Id
913 || self.fingerprint_algorithm == FingerprintAlgorithm::Id64
914 {
915 return Err(ArrowError::SchemaError(
916 "Invalid FingerprintAlgorithm; unable to generate fingerprint. \
917 Use the set method directly instead, providing a valid fingerprint"
918 .to_string(),
919 ));
920 }
921 let fingerprint =
922 AvroSchema::generate_fingerprint(&schema.schema()?, self.fingerprint_algorithm)?;
923 self.set(fingerprint, schema)?;
924 Ok(fingerprint)
925 }
926
927 pub fn lookup(&self, fingerprint: &Fingerprint) -> Option<&AvroSchema> {
937 self.schemas.get(fingerprint)
938 }
939
940 pub fn fingerprints(&self) -> Vec<Fingerprint> {
946 self.schemas.keys().copied().collect()
947 }
948
949 pub(crate) fn fingerprint_algorithm(&self) -> FingerprintAlgorithm {
951 self.fingerprint_algorithm
952 }
953}
954
955fn quote(s: &str) -> Result<String, ArrowError> {
956 serde_json::to_string(s)
957 .map_err(|e| ArrowError::ComputeError(format!("Failed to quote string: {e}")))
958}
959
960pub(crate) fn make_full_name(
977 name: &str,
978 namespace_attr: Option<&str>,
979 enclosing_ns: Option<&str>,
980) -> (String, Option<String>) {
981 if let Some((ns, _)) = name.rsplit_once('.') {
983 return (name.to_string(), Some(ns.to_string()));
984 }
985 match namespace_attr.or(enclosing_ns) {
986 Some(ns) => (format!("{ns}.{name}"), Some(ns.to_string())),
987 None => (name.to_string(), None),
988 }
989}
990
991fn build_canonical(schema: &Schema, enclosing_ns: Option<&str>) -> Result<String, ArrowError> {
992 Ok(match schema {
993 Schema::TypeName(tn) | Schema::Type(Type { r#type: tn, .. }) => match tn {
994 TypeName::Primitive(pt) => quote(pt.as_ref())?,
995 TypeName::Ref(name) => {
996 let (full_name, _) = make_full_name(name, None, enclosing_ns);
997 quote(&full_name)?
998 }
999 },
1000 Schema::Union(branches) => format!(
1001 "[{}]",
1002 branches
1003 .iter()
1004 .map(|b| build_canonical(b, enclosing_ns))
1005 .collect::<Result<Vec<_>, _>>()?
1006 .join(",")
1007 ),
1008 Schema::Complex(ct) => match ct {
1009 ComplexType::Record(r) => {
1010 let (full_name, child_ns) = make_full_name(r.name, r.namespace, enclosing_ns);
1011 let fields = r
1012 .fields
1013 .iter()
1014 .map(|f| {
1015 let field_type =
1020 build_canonical(&f.r#type, child_ns.as_deref().or(enclosing_ns))?;
1021 Ok(format!(
1022 r#"{{"name":{},"type":{}}}"#,
1023 quote(f.name)?,
1024 field_type
1025 ))
1026 })
1027 .collect::<Result<Vec<_>, ArrowError>>()?
1028 .join(",");
1029 format!(
1030 r#"{{"name":{},"type":"record","fields":[{fields}]}}"#,
1031 quote(&full_name)?,
1032 )
1033 }
1034 ComplexType::Enum(e) => {
1035 let (full_name, _) = make_full_name(e.name, e.namespace, enclosing_ns);
1036 let symbols = e
1037 .symbols
1038 .iter()
1039 .map(|s| quote(s))
1040 .collect::<Result<Vec<_>, _>>()?
1041 .join(",");
1042 format!(
1043 r#"{{"name":{},"type":"enum","symbols":[{symbols}]}}"#,
1044 quote(&full_name)?
1045 )
1046 }
1047 ComplexType::Array(arr) => format!(
1048 r#"{{"type":"array","items":{}}}"#,
1049 build_canonical(&arr.items, enclosing_ns)?
1050 ),
1051 ComplexType::Map(map) => format!(
1052 r#"{{"type":"map","values":{}}}"#,
1053 build_canonical(&map.values, enclosing_ns)?
1054 ),
1055 ComplexType::Fixed(f) => {
1056 let (full_name, _) = make_full_name(f.name, f.namespace, enclosing_ns);
1057 format!(
1058 r#"{{"name":{},"type":"fixed","size":{}}}"#,
1059 quote(&full_name)?,
1060 f.size
1061 )
1062 }
1063 },
1064 })
1065}
1066
1067const EMPTY: u64 = 0xc15d_213a_a4d7_a795;
1069
1070const fn one_entry(i: usize) -> u64 {
1077 let mut fp = i as u64;
1078 let mut j = 0;
1079 while j < 8 {
1080 fp = (fp >> 1) ^ (EMPTY & (0u64.wrapping_sub(fp & 1)));
1081 j += 1;
1082 }
1083 fp
1084}
1085
1086const fn build_table() -> [u64; 256] {
1093 let mut table = [0u64; 256];
1094 let mut i = 0;
1095 while i < 256 {
1096 table[i] = one_entry(i);
1097 i += 1;
1098 }
1099 table
1100}
1101
1102static FINGERPRINT_TABLE: [u64; 256] = build_table();
1104
1105pub(crate) fn compute_fingerprint_rabin(canonical_form: &str) -> u64 {
1108 let mut fp = EMPTY;
1109 for &byte in canonical_form.as_bytes() {
1110 let idx = ((fp as u8) ^ byte) as usize;
1111 fp = (fp >> 8) ^ FINGERPRINT_TABLE[idx];
1112 }
1113 fp
1114}
1115
1116#[cfg(feature = "md5")]
1117#[inline]
1122pub(crate) fn compute_fingerprint_md5(canonical_form: &str) -> [u8; 16] {
1123 let digest = md5::compute(canonical_form.as_bytes());
1124 digest.0
1125}
1126
1127#[cfg(feature = "sha256")]
1128#[inline]
1132pub(crate) fn compute_fingerprint_sha256(canonical_form: &str) -> [u8; 32] {
1133 let mut hasher = Sha256::new();
1134 hasher.update(canonical_form.as_bytes());
1135 let digest = hasher.finalize();
1136 digest.into()
1137}
1138
1139#[inline]
1140fn is_internal_arrow_key(key: &str) -> bool {
1141 key.starts_with("ARROW:") || key == SCHEMA_METADATA_KEY
1142}
1143
1144fn extend_with_passthrough_metadata(
1149 target: &mut JsonMap<String, Value>,
1150 metadata: &HashMap<String, String>,
1151) {
1152 for (meta_key, meta_val) in metadata {
1153 if meta_key.starts_with("avro.") || is_internal_arrow_key(meta_key) {
1154 continue;
1155 }
1156 let json_val =
1157 serde_json::from_str(meta_val).unwrap_or_else(|_| Value::String(meta_val.clone()));
1158 target.insert(meta_key.clone(), json_val);
1159 }
1160}
1161
1162fn sanitise_avro_name(base_name: &str) -> String {
1164 if base_name.is_empty() {
1165 return "_".to_owned();
1166 }
1167 let mut out: String = base_name
1168 .chars()
1169 .map(|char| {
1170 if char.is_ascii_alphanumeric() || char == '_' {
1171 char
1172 } else {
1173 '_'
1174 }
1175 })
1176 .collect();
1177 if out.as_bytes()[0].is_ascii_digit() {
1178 out.insert(0, '_');
1179 }
1180 out
1181}
1182
1183#[derive(Default)]
1184struct NameGenerator {
1185 used: HashSet<String>,
1186 counters: HashMap<String, usize>,
1187}
1188
1189impl NameGenerator {
1190 fn make_unique(&mut self, field_name: &str) -> String {
1191 let field_name = sanitise_avro_name(field_name);
1192 if self.used.insert(field_name.clone()) {
1193 self.counters.insert(field_name.clone(), 1);
1194 return field_name;
1195 }
1196 let counter = self.counters.entry(field_name.clone()).or_insert(1);
1197 loop {
1198 let candidate = format!("{field_name}_{}", *counter);
1199 if self.used.insert(candidate.clone()) {
1200 return candidate;
1201 }
1202 *counter += 1;
1203 }
1204 }
1205}
1206
1207fn merge_extras(schema: Value, extras: JsonMap<String, Value>) -> Value {
1208 if extras.is_empty() {
1209 return schema;
1210 }
1211 match schema {
1212 Value::Object(mut map) => {
1213 map.extend(extras);
1214 Value::Object(map)
1215 }
1216 Value::Array(mut union) => {
1217 if let Some(non_null) = union.iter_mut().find(|val| val.as_str() != Some("null")) {
1220 let original = std::mem::take(non_null);
1221 *non_null = merge_extras(original, extras);
1222 }
1223 Value::Array(union)
1224 }
1225 primitive => {
1226 let mut map = JsonMap::with_capacity(extras.len() + 1);
1227 map.insert("type".into(), primitive);
1228 map.extend(extras);
1229 Value::Object(map)
1230 }
1231 }
1232}
1233
1234#[inline]
1235fn is_avro_json_null(v: &Value) -> bool {
1236 matches!(v, Value::String(s) if s == "null")
1237}
1238
1239fn wrap_nullable(inner: Value, null_order: Nullability) -> Value {
1240 let null = Value::String("null".into());
1241 match inner {
1242 Value::Array(mut union) => {
1243 if union.iter().any(is_avro_json_null) {
1248 return Value::Array(union);
1249 }
1250 match null_order {
1252 Nullability::NullFirst => union.insert(0, null),
1253 Nullability::NullSecond => union.push(null),
1254 }
1255 Value::Array(union)
1256 }
1257 other => match null_order {
1258 Nullability::NullFirst => Value::Array(vec![null, other]),
1259 Nullability::NullSecond => Value::Array(vec![other, null]),
1260 },
1261 }
1262}
1263
1264fn min_fixed_bytes_for_precision(p: usize) -> usize {
1265 const MAX_P: [usize; 32] = [
1268 2, 4, 6, 9, 11, 14, 16, 18, 21, 23, 26, 28, 31, 33, 35, 38, 40, 43, 45, 47, 50, 52, 55, 57,
1269 59, 62, 64, 67, 69, 71, 74, 76,
1270 ];
1271 for (i, &max_p) in MAX_P.iter().enumerate() {
1272 if p <= max_p {
1273 return i + 1;
1274 }
1275 }
1276 32 }
1278
1279fn union_branch_signature(branch: &Value) -> Result<String, ArrowError> {
1280 match branch {
1281 Value::String(t) => Ok(format!("P:{t}")),
1282 Value::Object(map) => {
1283 let t = map.get("type").and_then(|v| v.as_str()).ok_or_else(|| {
1284 ArrowError::SchemaError("Union branch object missing string 'type'".into())
1285 })?;
1286 match t {
1287 "record" | "enum" | "fixed" => {
1288 let name = map.get("name").and_then(|v| v.as_str()).ok_or_else(|| {
1289 ArrowError::SchemaError(format!(
1290 "Union branch '{t}' missing required 'name'"
1291 ))
1292 })?;
1293 Ok(format!("N:{t}:{name}"))
1294 }
1295 "array" | "map" => Ok(format!("C:{t}")),
1296 other => Ok(format!("P:{other}")),
1297 }
1298 }
1299 Value::Array(_) => Err(ArrowError::SchemaError(
1300 "Avro union may not immediately contain another union".into(),
1301 )),
1302 _ => Err(ArrowError::SchemaError(
1303 "Invalid JSON for Avro union branch".into(),
1304 )),
1305 }
1306}
1307
1308fn datatype_to_avro(
1309 dt: &DataType,
1310 field_name: &str,
1311 metadata: &HashMap<String, String>,
1312 name_gen: &mut NameGenerator,
1313 null_order: Nullability,
1314 strip: bool,
1315) -> Result<(Value, JsonMap<String, Value>), ArrowError> {
1316 let mut extras = JsonMap::new();
1317 let mut handle_decimal = |precision: &u8, scale: &i8| -> Result<Value, ArrowError> {
1318 if *scale < 0 {
1319 return Err(ArrowError::SchemaError(format!(
1320 "Invalid Avro decimal for field '{field_name}': scale ({scale}) must be >= 0"
1321 )));
1322 }
1323 if (*scale as usize) > (*precision as usize) {
1324 return Err(ArrowError::SchemaError(format!(
1325 "Invalid Avro decimal for field '{field_name}': scale ({scale}) \
1326 must be <= precision ({precision})"
1327 )));
1328 }
1329 let mut meta = JsonMap::from_iter([
1330 ("logicalType".into(), json!("decimal")),
1331 ("precision".into(), json!(*precision)),
1332 ("scale".into(), json!(*scale)),
1333 ]);
1334 let mut fixed_size = metadata.get("size").and_then(|v| v.parse::<usize>().ok());
1335 let carries_name = metadata.contains_key(AVRO_NAME_METADATA_KEY)
1336 || metadata.contains_key(AVRO_NAMESPACE_METADATA_KEY);
1337 if fixed_size.is_none() && carries_name {
1338 fixed_size = Some(min_fixed_bytes_for_precision(*precision as usize));
1339 }
1340 if let Some(size) = fixed_size {
1341 meta.insert("type".into(), json!("fixed"));
1342 meta.insert("size".into(), json!(size));
1343 let chosen_name = metadata
1344 .get(AVRO_NAME_METADATA_KEY)
1345 .map(|s| sanitise_avro_name(s))
1346 .unwrap_or_else(|| name_gen.make_unique(field_name));
1347 meta.insert("name".into(), json!(chosen_name));
1348 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1349 meta.insert("namespace".into(), json!(ns));
1350 }
1351 } else {
1352 meta.insert("type".into(), json!("bytes"));
1354 }
1355 Ok(Value::Object(meta))
1356 };
1357 let val = match dt {
1358 DataType::Null => Value::String("null".into()),
1359 DataType::Boolean => Value::String("boolean".into()),
1360 #[cfg(not(feature = "avro_custom_types"))]
1361 DataType::Int8 | DataType::Int16 | DataType::UInt8 | DataType::UInt16 => {
1362 Value::String("int".into())
1363 }
1364 DataType::Int32 => Value::String("int".into()),
1365 #[cfg(feature = "avro_custom_types")]
1366 DataType::Int8 => json!({ "type": "int", "logicalType": "arrow.int8" }),
1367 #[cfg(feature = "avro_custom_types")]
1368 DataType::Int16 => json!({ "type": "int", "logicalType": "arrow.int16" }),
1369 #[cfg(feature = "avro_custom_types")]
1370 DataType::UInt8 => json!({ "type": "int", "logicalType": "arrow.uint8" }),
1371 #[cfg(feature = "avro_custom_types")]
1372 DataType::UInt16 => json!({ "type": "int", "logicalType": "arrow.uint16" }),
1373 #[cfg(not(feature = "avro_custom_types"))]
1374 DataType::UInt32 => Value::String("long".into()),
1375 #[cfg(feature = "avro_custom_types")]
1376 DataType::UInt32 => json!({ "type": "long", "logicalType": "arrow.uint32" }),
1377 DataType::Int64 => Value::String("long".into()),
1378 #[cfg(not(feature = "avro_custom_types"))]
1379 DataType::UInt64 => Value::String("long".into()),
1380 #[cfg(feature = "avro_custom_types")]
1381 DataType::UInt64 => {
1382 let chosen_name = metadata
1384 .get(AVRO_NAME_METADATA_KEY)
1385 .map(|s| sanitise_avro_name(s))
1386 .unwrap_or_else(|| name_gen.make_unique(field_name));
1387 let mut obj = JsonMap::from_iter([
1388 ("type".into(), json!("fixed")),
1389 ("name".into(), json!(chosen_name)),
1390 ("size".into(), json!(8)),
1391 ("logicalType".into(), json!("arrow.uint64")),
1392 ]);
1393 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1394 obj.insert("namespace".into(), json!(ns));
1395 }
1396 json!(obj)
1397 }
1398 #[cfg(not(feature = "avro_custom_types"))]
1399 DataType::Float16 => Value::String("float".into()),
1400 #[cfg(feature = "avro_custom_types")]
1401 DataType::Float16 => {
1402 let chosen_name = metadata
1404 .get(AVRO_NAME_METADATA_KEY)
1405 .map(|s| sanitise_avro_name(s))
1406 .unwrap_or_else(|| name_gen.make_unique(field_name));
1407 let mut obj = JsonMap::from_iter([
1408 ("type".into(), json!("fixed")),
1409 ("name".into(), json!(chosen_name)),
1410 ("size".into(), json!(2)),
1411 ("logicalType".into(), json!("arrow.float16")),
1412 ]);
1413 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1414 obj.insert("namespace".into(), json!(ns));
1415 }
1416 json!(obj)
1417 }
1418 DataType::Float32 => Value::String("float".into()),
1419 DataType::Float64 => Value::String("double".into()),
1420 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => Value::String("string".into()),
1421 DataType::Binary | DataType::LargeBinary => Value::String("bytes".into()),
1422 DataType::BinaryView => {
1423 if !strip {
1424 extras.insert("arrowBinaryView".into(), Value::Bool(true));
1425 }
1426 Value::String("bytes".into())
1427 }
1428 DataType::FixedSizeBinary(len) => {
1429 let md_is_uuid = metadata
1430 .get("logicalType")
1431 .map(|s| s.trim_matches('"') == "uuid")
1432 .unwrap_or(false);
1433 #[cfg(feature = "canonical_extension_types")]
1434 let ext_is_uuid = metadata
1435 .get(arrow_schema::extension::EXTENSION_TYPE_NAME_KEY)
1436 .map(|v| v == arrow_schema::extension::Uuid::NAME || v == "uuid")
1437 .unwrap_or(false);
1438 #[cfg(not(feature = "canonical_extension_types"))]
1439 let ext_is_uuid = false;
1440 let is_uuid = (*len == 16) && (md_is_uuid || ext_is_uuid);
1441 if is_uuid {
1442 json!({ "type": "string", "logicalType": "uuid" })
1443 } else {
1444 let chosen_name = metadata
1445 .get(AVRO_NAME_METADATA_KEY)
1446 .map(|s| sanitise_avro_name(s))
1447 .unwrap_or_else(|| name_gen.make_unique(field_name));
1448 let mut obj = JsonMap::from_iter([
1449 ("type".into(), json!("fixed")),
1450 ("name".into(), json!(chosen_name)),
1451 ("size".into(), json!(len)),
1452 ]);
1453 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1454 obj.insert("namespace".into(), json!(ns));
1455 }
1456 Value::Object(obj)
1457 }
1458 }
1459 #[cfg(feature = "small_decimals")]
1460 DataType::Decimal32(precision, scale) | DataType::Decimal64(precision, scale) => {
1461 handle_decimal(precision, scale)?
1462 }
1463 DataType::Decimal128(precision, scale) | DataType::Decimal256(precision, scale) => {
1464 handle_decimal(precision, scale)?
1465 }
1466 DataType::Date32 => json!({ "type": "int", "logicalType": "date" }),
1467 #[cfg(not(feature = "avro_custom_types"))]
1468 DataType::Date64 => json!({ "type": "long", "logicalType": "local-timestamp-millis" }),
1469 #[cfg(feature = "avro_custom_types")]
1470 DataType::Date64 => json!({ "type": "long", "logicalType": "arrow.date64" }),
1471 DataType::Time32(unit) => match unit {
1472 TimeUnit::Millisecond => json!({ "type": "int", "logicalType": "time-millis" }),
1473 #[cfg(not(feature = "avro_custom_types"))]
1474 TimeUnit::Second => {
1475 if !strip {
1477 extras.insert("arrowTimeUnit".into(), Value::String("second".into()));
1478 }
1479 json!({ "type": "int", "logicalType": "time-millis" })
1480 }
1481 #[cfg(feature = "avro_custom_types")]
1482 TimeUnit::Second => {
1483 json!({ "type": "int", "logicalType": "arrow.time32-second" })
1484 }
1485 _ => Value::String("int".into()),
1486 },
1487 DataType::Time64(unit) => match unit {
1488 TimeUnit::Microsecond => json!({ "type": "long", "logicalType": "time-micros" }),
1489 #[cfg(not(feature = "avro_custom_types"))]
1490 TimeUnit::Nanosecond => {
1491 if !strip {
1493 extras.insert("arrowTimeUnit".into(), Value::String("nanosecond".into()));
1494 }
1495 json!({ "type": "long", "logicalType": "time-micros" })
1496 }
1497 #[cfg(feature = "avro_custom_types")]
1498 TimeUnit::Nanosecond => {
1499 json!({ "type": "long", "logicalType": "arrow.time64-nanosecond" })
1500 }
1501 _ => Value::String("long".into()),
1502 },
1503 DataType::Timestamp(unit, tz) => {
1504 #[cfg(feature = "avro_custom_types")]
1505 if matches!(unit, TimeUnit::Second) {
1506 let logical_type = if tz.is_some() {
1507 "arrow.timestamp-second"
1508 } else {
1509 "arrow.local-timestamp-second"
1510 };
1511 return Ok((
1512 json!({ "type": "long", "logicalType": logical_type }),
1513 extras,
1514 ));
1515 }
1516 let logical_type = match (unit, tz.is_some()) {
1517 (TimeUnit::Millisecond, true) => "timestamp-millis",
1518 (TimeUnit::Millisecond, false) => "local-timestamp-millis",
1519 (TimeUnit::Microsecond, true) => "timestamp-micros",
1520 (TimeUnit::Microsecond, false) => "local-timestamp-micros",
1521 (TimeUnit::Nanosecond, true) => "timestamp-nanos",
1522 (TimeUnit::Nanosecond, false) => "local-timestamp-nanos",
1523 (TimeUnit::Second, has_tz) => {
1524 if !strip {
1526 extras.insert("arrowTimeUnit".into(), Value::String("second".into()));
1527 }
1528 let ts_logical_type = if has_tz {
1529 "timestamp-millis"
1530 } else {
1531 "local-timestamp-millis"
1532 };
1533 return Ok((
1534 json!({ "type": "long", "logicalType": ts_logical_type }),
1535 extras,
1536 ));
1537 }
1538 };
1539 if !strip && matches!(unit, TimeUnit::Nanosecond) {
1540 extras.insert("arrowTimeUnit".into(), Value::String("nanosecond".into()));
1541 }
1542 json!({ "type": "long", "logicalType": logical_type })
1543 }
1544 #[cfg(not(feature = "avro_custom_types"))]
1545 DataType::Duration(_unit) => Value::String("long".into()),
1546 #[cfg(feature = "avro_custom_types")]
1547 DataType::Duration(unit) => {
1548 let logical_type = match unit {
1551 TimeUnit::Second => "arrow.duration-seconds",
1552 TimeUnit::Millisecond => "arrow.duration-millis",
1553 TimeUnit::Microsecond => "arrow.duration-micros",
1554 TimeUnit::Nanosecond => "arrow.duration-nanos",
1555 };
1556 json!({ "type": "long", "logicalType": logical_type })
1557 }
1558 #[cfg(not(feature = "avro_custom_types"))]
1559 DataType::Interval(IntervalUnit::MonthDayNano) => {
1560 let chosen_name = metadata
1562 .get(AVRO_NAME_METADATA_KEY)
1563 .map(|s| sanitise_avro_name(s))
1564 .unwrap_or_else(|| name_gen.make_unique(field_name));
1565 let mut obj = JsonMap::from_iter([
1566 ("type".into(), json!("fixed")),
1567 ("name".into(), json!(chosen_name)),
1568 ("size".into(), json!(12)),
1569 ("logicalType".into(), json!("duration")),
1570 ]);
1571 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1572 obj.insert("namespace".into(), json!(ns));
1573 }
1574 json!(obj)
1575 }
1576 #[cfg(feature = "avro_custom_types")]
1577 DataType::Interval(IntervalUnit::MonthDayNano) => {
1578 let chosen_name = metadata
1581 .get(AVRO_NAME_METADATA_KEY)
1582 .map(|s| sanitise_avro_name(s))
1583 .unwrap_or_else(|| name_gen.make_unique(field_name));
1584 let mut obj = JsonMap::from_iter([
1585 ("type".into(), json!("fixed")),
1586 ("name".into(), json!(chosen_name)),
1587 ("size".into(), json!(16)),
1588 ("logicalType".into(), json!("arrow.interval-month-day-nano")),
1589 ]);
1590 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1591 obj.insert("namespace".into(), json!(ns));
1592 }
1593 json!(obj)
1594 }
1595 #[cfg(not(feature = "avro_custom_types"))]
1596 DataType::Interval(IntervalUnit::YearMonth) => {
1597 let chosen_name = metadata
1599 .get(AVRO_NAME_METADATA_KEY)
1600 .map(|s| sanitise_avro_name(s))
1601 .unwrap_or_else(|| name_gen.make_unique(field_name));
1602 let mut extras = JsonMap::from_iter([
1603 ("type".into(), json!("fixed")),
1604 ("name".into(), json!(chosen_name)),
1605 ("size".into(), json!(12)),
1606 ("logicalType".into(), json!("duration")),
1607 ]);
1608 if !strip {
1609 extras.insert(
1610 "arrowIntervalUnit".into(),
1611 Value::String("yearmonth".into()),
1612 );
1613 }
1614 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1615 extras.insert("namespace".into(), json!(ns));
1616 }
1617 json!(extras)
1618 }
1619 #[cfg(feature = "avro_custom_types")]
1620 DataType::Interval(IntervalUnit::YearMonth) => {
1621 let chosen_name = metadata
1622 .get(AVRO_NAME_METADATA_KEY)
1623 .map(|s| sanitise_avro_name(s))
1624 .unwrap_or_else(|| name_gen.make_unique(field_name));
1625 let mut obj = JsonMap::from_iter([
1626 ("type".into(), json!("fixed")),
1627 ("name".into(), json!(chosen_name)),
1628 ("size".into(), json!(4)),
1629 ("logicalType".into(), json!("arrow.interval-year-month")),
1630 ]);
1631 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1632 obj.insert("namespace".into(), json!(ns));
1633 }
1634 json!(obj)
1635 }
1636 #[cfg(not(feature = "avro_custom_types"))]
1637 DataType::Interval(IntervalUnit::DayTime) => {
1638 let chosen_name = metadata
1640 .get(AVRO_NAME_METADATA_KEY)
1641 .map(|s| sanitise_avro_name(s))
1642 .unwrap_or_else(|| name_gen.make_unique(field_name));
1643 let mut obj = JsonMap::from_iter([
1644 ("type".into(), json!("fixed")),
1645 ("name".into(), json!(chosen_name)),
1646 ("size".into(), json!(12)),
1647 ("logicalType".into(), json!("duration")),
1648 ]);
1649 if !strip {
1650 obj.insert("arrowIntervalUnit".into(), Value::String("daytime".into()));
1651 }
1652 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1653 obj.insert("namespace".into(), json!(ns));
1654 }
1655 json!(obj)
1656 }
1657 #[cfg(feature = "avro_custom_types")]
1658 DataType::Interval(IntervalUnit::DayTime) => {
1659 let chosen_name = metadata
1660 .get(AVRO_NAME_METADATA_KEY)
1661 .map(|s| sanitise_avro_name(s))
1662 .unwrap_or_else(|| name_gen.make_unique(field_name));
1663 let mut obj = JsonMap::from_iter([
1664 ("type".into(), json!("fixed")),
1665 ("name".into(), json!(chosen_name)),
1666 ("size".into(), json!(8)),
1667 ("logicalType".into(), json!("arrow.interval-day-time")),
1668 ]);
1669 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1670 obj.insert("namespace".into(), json!(ns));
1671 }
1672 json!(obj)
1673 }
1674 DataType::List(child) | DataType::LargeList(child) => {
1675 if matches!(dt, DataType::LargeList(_)) && !strip {
1676 extras.insert("arrowLargeList".into(), Value::Bool(true));
1677 }
1678 let items_schema = process_datatype(
1679 child.data_type(),
1680 child.name(),
1681 child.metadata(),
1682 name_gen,
1683 null_order,
1684 child.is_nullable(),
1685 strip,
1686 )?;
1687 json!({
1688 "type": "array",
1689 "items": items_schema
1690 })
1691 }
1692 DataType::ListView(child) | DataType::LargeListView(child) => {
1693 if matches!(dt, DataType::LargeListView(_)) && !strip {
1694 extras.insert("arrowLargeList".into(), Value::Bool(true));
1695 }
1696 if !strip {
1697 extras.insert("arrowListView".into(), Value::Bool(true));
1698 }
1699 let items_schema = process_datatype(
1700 child.data_type(),
1701 child.name(),
1702 child.metadata(),
1703 name_gen,
1704 null_order,
1705 child.is_nullable(),
1706 strip,
1707 )?;
1708 json!({
1709 "type": "array",
1710 "items": items_schema
1711 })
1712 }
1713 DataType::FixedSizeList(child, len) => {
1714 if !strip {
1715 extras.insert("arrowFixedSize".into(), json!(len));
1716 }
1717 let items_schema = process_datatype(
1718 child.data_type(),
1719 child.name(),
1720 child.metadata(),
1721 name_gen,
1722 null_order,
1723 child.is_nullable(),
1724 strip,
1725 )?;
1726 json!({
1727 "type": "array",
1728 "items": items_schema
1729 })
1730 }
1731 DataType::Map(entries, _) => {
1732 let value_field = match entries.data_type() {
1733 DataType::Struct(fs) => &fs[1],
1734 _ => {
1735 return Err(ArrowError::SchemaError(
1736 "Map 'entries' field must be Struct(key,value)".into(),
1737 ));
1738 }
1739 };
1740 let values_schema = process_datatype(
1741 value_field.data_type(),
1742 value_field.name(),
1743 value_field.metadata(),
1744 name_gen,
1745 null_order,
1746 value_field.is_nullable(),
1747 strip,
1748 )?;
1749 json!({
1750 "type": "map",
1751 "values": values_schema
1752 })
1753 }
1754 DataType::Struct(fields) => {
1755 let avro_fields = fields
1756 .iter()
1757 .map(|field| arrow_field_to_avro(field, name_gen, null_order, strip))
1758 .collect::<Result<Vec<_>, _>>()?;
1759 let chosen_name = metadata
1761 .get(AVRO_NAME_METADATA_KEY)
1762 .map(|s| sanitise_avro_name(s))
1763 .unwrap_or_else(|| name_gen.make_unique(field_name));
1764 let mut obj = JsonMap::from_iter([
1765 ("type".into(), json!("record")),
1766 ("name".into(), json!(chosen_name)),
1767 ("fields".into(), Value::Array(avro_fields)),
1768 ]);
1769 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1770 obj.insert("namespace".into(), json!(ns));
1771 }
1772 Value::Object(obj)
1773 }
1774 DataType::Dictionary(_, value) => {
1775 if let Some(j) = metadata.get(AVRO_ENUM_SYMBOLS_METADATA_KEY) {
1776 let symbols: Vec<&str> =
1777 serde_json::from_str(j).map_err(|e| ArrowError::ParseError(e.to_string()))?;
1778 let chosen_name = metadata
1780 .get(AVRO_NAME_METADATA_KEY)
1781 .map(|s| sanitise_avro_name(s))
1782 .unwrap_or_else(|| name_gen.make_unique(field_name));
1783 let mut obj = JsonMap::from_iter([
1784 ("type".into(), json!("enum")),
1785 ("name".into(), json!(chosen_name)),
1786 ("symbols".into(), json!(symbols)),
1787 ]);
1788 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1789 obj.insert("namespace".into(), json!(ns));
1790 }
1791 Value::Object(obj)
1792 } else {
1793 process_datatype(
1794 value.as_ref(),
1795 field_name,
1796 metadata,
1797 name_gen,
1798 null_order,
1799 false,
1800 strip,
1801 )?
1802 }
1803 }
1804 #[cfg(feature = "avro_custom_types")]
1805 DataType::RunEndEncoded(run_ends, values) => {
1806 let bits = match run_ends.data_type() {
1807 DataType::Int16 => 16,
1808 DataType::Int32 => 32,
1809 DataType::Int64 => 64,
1810 other => {
1811 return Err(ArrowError::SchemaError(format!(
1812 "RunEndEncoded requires Int16/Int32/Int64 for run_ends, found: {other:?}"
1813 )));
1814 }
1815 };
1816 let (value_schema, value_extras) = datatype_to_avro(
1818 values.data_type(),
1819 values.name(),
1820 values.metadata(),
1821 name_gen,
1822 null_order,
1823 strip,
1824 )?;
1825 let mut merged = merge_extras(value_schema, value_extras);
1826 if values.is_nullable() {
1827 merged = wrap_nullable(merged, null_order);
1828 }
1829 let mut extras = JsonMap::new();
1830 extras.insert("logicalType".into(), json!("arrow.run-end-encoded"));
1831 extras.insert("arrow.runEndIndexBits".into(), json!(bits));
1832 return Ok((merged, extras));
1833 }
1834 #[cfg(not(feature = "avro_custom_types"))]
1835 DataType::RunEndEncoded(_run_ends, values) => {
1836 let (value_schema, _extras) = datatype_to_avro(
1837 values.data_type(),
1838 values.name(),
1839 values.metadata(),
1840 name_gen,
1841 null_order,
1842 strip,
1843 )?;
1844 return Ok((value_schema, JsonMap::new()));
1845 }
1846 DataType::Union(fields, mode) => {
1847 let mut branches: Vec<Value> = Vec::with_capacity(fields.len());
1848 let mut type_ids: Vec<i32> = Vec::with_capacity(fields.len());
1849 for (type_id, field_ref) in fields.iter() {
1850 let (branch_schema, _branch_extras) = datatype_to_avro(
1852 field_ref.data_type(),
1853 field_ref.name(),
1854 field_ref.metadata(),
1855 name_gen,
1856 null_order,
1857 strip,
1858 )?;
1859 if matches!(branch_schema, Value::Array(_)) {
1861 return Err(ArrowError::SchemaError(
1862 "Avro union may not immediately contain another union".into(),
1863 ));
1864 }
1865 branches.push(branch_schema);
1866 type_ids.push(type_id as i32);
1867 }
1868 let mut seen: HashSet<String> = HashSet::with_capacity(branches.len());
1869 for b in &branches {
1870 let sig = union_branch_signature(b)?;
1871 if !seen.insert(sig) {
1872 return Err(ArrowError::SchemaError(
1873 "Avro union contains duplicate branch types (disallowed by spec)".into(),
1874 ));
1875 }
1876 }
1877 if !strip {
1878 extras.insert(
1879 "arrowUnionMode".into(),
1880 Value::String(
1881 match mode {
1882 UnionMode::Sparse => "sparse",
1883 UnionMode::Dense => "dense",
1884 }
1885 .to_string(),
1886 ),
1887 );
1888 extras.insert(
1889 "arrowUnionTypeIds".into(),
1890 Value::Array(type_ids.into_iter().map(|id| json!(id)).collect()),
1891 );
1892 }
1893 Value::Array(branches)
1894 }
1895 #[cfg(not(feature = "small_decimals"))]
1896 other => {
1897 return Err(ArrowError::NotYetImplemented(format!(
1898 "Arrow type {other:?} has no Avro representation"
1899 )));
1900 }
1901 };
1902 Ok((val, extras))
1903}
1904
1905fn process_datatype(
1906 dt: &DataType,
1907 field_name: &str,
1908 metadata: &HashMap<String, String>,
1909 name_gen: &mut NameGenerator,
1910 null_order: Nullability,
1911 is_nullable: bool,
1912 strip: bool,
1913) -> Result<Value, ArrowError> {
1914 let (schema, extras) = datatype_to_avro(dt, field_name, metadata, name_gen, null_order, strip)?;
1915 let mut merged = merge_extras(schema, extras);
1916 if is_nullable {
1917 merged = wrap_nullable(merged, null_order)
1918 }
1919 Ok(merged)
1920}
1921
1922fn arrow_field_to_avro(
1923 field: &ArrowField,
1924 name_gen: &mut NameGenerator,
1925 null_order: Nullability,
1926 strip: bool,
1927) -> Result<Value, ArrowError> {
1928 let avro_name = sanitise_avro_name(field.name());
1929 let schema_value = process_datatype(
1930 field.data_type(),
1931 &avro_name,
1932 field.metadata(),
1933 name_gen,
1934 null_order,
1935 field.is_nullable(),
1936 strip,
1937 )?;
1938 let mut map = JsonMap::with_capacity(field.metadata().len() + 3);
1940 map.insert("name".into(), Value::String(avro_name));
1941 map.insert("type".into(), schema_value);
1942 for (meta_key, meta_val) in field.metadata() {
1944 if is_internal_arrow_key(meta_key) {
1945 continue;
1946 }
1947 match meta_key.as_str() {
1948 AVRO_DOC_METADATA_KEY => {
1949 map.insert("doc".into(), Value::String(meta_val.clone()));
1950 }
1951 AVRO_FIELD_DEFAULT_METADATA_KEY => {
1952 let default_value = serde_json::from_str(meta_val)
1953 .unwrap_or_else(|_| Value::String(meta_val.clone()));
1954 map.insert("default".into(), default_value);
1955 }
1956 _ => {
1957 let json_val = serde_json::from_str(meta_val)
1958 .unwrap_or_else(|_| Value::String(meta_val.clone()));
1959 map.insert(meta_key.clone(), json_val);
1960 }
1961 }
1962 }
1963 Ok(Value::Object(map))
1964}
1965
1966#[cfg(test)]
1967mod tests {
1968 use super::*;
1969 use crate::codec::{AvroField, AvroFieldBuilder};
1970 use arrow_schema::{DataType, Fields, SchemaBuilder, TimeUnit, UnionFields};
1971 use serde_json::json;
1972 use std::sync::Arc;
1973
1974 fn int_schema() -> Schema<'static> {
1975 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))
1976 }
1977
1978 fn record_schema() -> Schema<'static> {
1979 Schema::Complex(ComplexType::Record(Record {
1980 name: "record1",
1981 namespace: Some("test.namespace"),
1982 doc: Some(Cow::from("A test record")),
1983 aliases: vec![],
1984 fields: vec![
1985 Field {
1986 name: "field1",
1987 doc: Some(Cow::from("An integer field")),
1988 r#type: int_schema(),
1989 default: None,
1990 aliases: vec![],
1991 },
1992 Field {
1993 name: "field2",
1994 doc: None,
1995 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
1996 default: None,
1997 aliases: vec![],
1998 },
1999 ],
2000 attributes: Attributes::default(),
2001 }))
2002 }
2003
2004 fn single_field_schema(field: ArrowField) -> arrow_schema::Schema {
2005 let mut sb = SchemaBuilder::new();
2006 sb.push(field);
2007 sb.finish()
2008 }
2009
2010 fn assert_json_contains(avro_json: &str, needle: &str) {
2011 assert!(
2012 avro_json.contains(needle),
2013 "JSON did not contain `{needle}` : {avro_json}"
2014 )
2015 }
2016
2017 #[test]
2018 fn test_deserialize() {
2019 let t: Schema = serde_json::from_str("\"string\"").unwrap();
2020 assert_eq!(
2021 t,
2022 Schema::TypeName(TypeName::Primitive(PrimitiveType::String))
2023 );
2024
2025 let t: Schema = serde_json::from_str("[\"int\", \"null\"]").unwrap();
2026 assert_eq!(
2027 t,
2028 Schema::Union(vec![
2029 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2030 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2031 ])
2032 );
2033
2034 let t: Type = serde_json::from_str(
2035 r#"{
2036 "type":"long",
2037 "logicalType":"timestamp-micros"
2038 }"#,
2039 )
2040 .unwrap();
2041
2042 let timestamp = Type {
2043 r#type: TypeName::Primitive(PrimitiveType::Long),
2044 attributes: Attributes {
2045 logical_type: Some("timestamp-micros"),
2046 additional: Default::default(),
2047 },
2048 };
2049
2050 assert_eq!(t, timestamp);
2051
2052 let t: ComplexType = serde_json::from_str(
2053 r#"{
2054 "type":"fixed",
2055 "name":"fixed",
2056 "namespace":"topLevelRecord.value",
2057 "size":11,
2058 "logicalType":"decimal",
2059 "precision":25,
2060 "scale":2
2061 }"#,
2062 )
2063 .unwrap();
2064
2065 let decimal = ComplexType::Fixed(Fixed {
2066 name: "fixed",
2067 namespace: Some("topLevelRecord.value"),
2068 aliases: vec![],
2069 size: 11,
2070 attributes: Attributes {
2071 logical_type: Some("decimal"),
2072 additional: vec![("precision", json!(25)), ("scale", json!(2))]
2073 .into_iter()
2074 .collect(),
2075 },
2076 });
2077
2078 assert_eq!(t, decimal);
2079
2080 let schema: Schema = serde_json::from_str(
2081 r#"{
2082 "type":"record",
2083 "name":"topLevelRecord",
2084 "fields":[
2085 {
2086 "name":"value",
2087 "type":[
2088 {
2089 "type":"fixed",
2090 "name":"fixed",
2091 "namespace":"topLevelRecord.value",
2092 "size":11,
2093 "logicalType":"decimal",
2094 "precision":25,
2095 "scale":2
2096 },
2097 "null"
2098 ]
2099 }
2100 ]
2101 }"#,
2102 )
2103 .unwrap();
2104
2105 assert_eq!(
2106 schema,
2107 Schema::Complex(ComplexType::Record(Record {
2108 name: "topLevelRecord",
2109 namespace: None,
2110 doc: None,
2111 aliases: vec![],
2112 fields: vec![Field {
2113 name: "value",
2114 doc: None,
2115 r#type: Schema::Union(vec![
2116 Schema::Complex(decimal),
2117 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2118 ]),
2119 default: None,
2120 aliases: vec![],
2121 },],
2122 attributes: Default::default(),
2123 }))
2124 );
2125
2126 let schema: Schema = serde_json::from_str(
2127 r#"{
2128 "type": "record",
2129 "name": "LongList",
2130 "aliases": ["LinkedLongs"],
2131 "fields" : [
2132 {"name": "value", "type": "long"},
2133 {"name": "next", "type": ["null", "LongList"]}
2134 ]
2135 }"#,
2136 )
2137 .unwrap();
2138
2139 assert_eq!(
2140 schema,
2141 Schema::Complex(ComplexType::Record(Record {
2142 name: "LongList",
2143 namespace: None,
2144 doc: None,
2145 aliases: vec!["LinkedLongs"],
2146 fields: vec![
2147 Field {
2148 name: "value",
2149 doc: None,
2150 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2151 default: None,
2152 aliases: vec![],
2153 },
2154 Field {
2155 name: "next",
2156 doc: None,
2157 r#type: Schema::Union(vec![
2158 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2159 Schema::TypeName(TypeName::Ref("LongList")),
2160 ]),
2161 default: None,
2162 aliases: vec![],
2163 }
2164 ],
2165 attributes: Attributes::default(),
2166 }))
2167 );
2168
2169 let err = AvroField::try_from(&schema).unwrap_err().to_string();
2171 assert_eq!(err, "Parser error: Failed to resolve .LongList");
2172
2173 let schema: Schema = serde_json::from_str(
2174 r#"{
2175 "type":"record",
2176 "name":"topLevelRecord",
2177 "fields":[
2178 {
2179 "name":"id",
2180 "type":[
2181 "int",
2182 "null"
2183 ]
2184 },
2185 {
2186 "name":"timestamp_col",
2187 "type":[
2188 {
2189 "type":"long",
2190 "logicalType":"timestamp-micros"
2191 },
2192 "null"
2193 ]
2194 }
2195 ]
2196 }"#,
2197 )
2198 .unwrap();
2199
2200 assert_eq!(
2201 schema,
2202 Schema::Complex(ComplexType::Record(Record {
2203 name: "topLevelRecord",
2204 namespace: None,
2205 doc: None,
2206 aliases: vec![],
2207 fields: vec![
2208 Field {
2209 name: "id",
2210 doc: None,
2211 r#type: Schema::Union(vec![
2212 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2213 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2214 ]),
2215 default: None,
2216 aliases: vec![],
2217 },
2218 Field {
2219 name: "timestamp_col",
2220 doc: None,
2221 r#type: Schema::Union(vec![
2222 Schema::Type(timestamp),
2223 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2224 ]),
2225 default: None,
2226 aliases: vec![],
2227 }
2228 ],
2229 attributes: Default::default(),
2230 }))
2231 );
2232 let codec = AvroField::try_from(&schema).unwrap();
2233 let expected_arrow_field = arrow_schema::Field::new(
2234 "topLevelRecord",
2235 DataType::Struct(Fields::from(vec![
2236 arrow_schema::Field::new("id", DataType::Int32, true),
2237 arrow_schema::Field::new(
2238 "timestamp_col",
2239 DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
2240 true,
2241 ),
2242 ])),
2243 false,
2244 )
2245 .with_metadata(std::collections::HashMap::from([(
2246 AVRO_NAME_METADATA_KEY.to_string(),
2247 "topLevelRecord".to_string(),
2248 )]));
2249
2250 assert_eq!(codec.field(), expected_arrow_field);
2251
2252 let schema: Schema = serde_json::from_str(
2253 r#"{
2254 "type": "record",
2255 "name": "HandshakeRequest", "namespace":"org.apache.avro.ipc",
2256 "fields": [
2257 {"name": "clientHash", "type": {"type": "fixed", "name": "MD5", "size": 16}},
2258 {"name": "clientProtocol", "type": ["null", "string"]},
2259 {"name": "serverHash", "type": "MD5"},
2260 {"name": "meta", "type": ["null", {"type": "map", "values": "bytes"}]}
2261 ]
2262 }"#,
2263 )
2264 .unwrap();
2265
2266 assert_eq!(
2267 schema,
2268 Schema::Complex(ComplexType::Record(Record {
2269 name: "HandshakeRequest",
2270 namespace: Some("org.apache.avro.ipc"),
2271 doc: None,
2272 aliases: vec![],
2273 fields: vec![
2274 Field {
2275 name: "clientHash",
2276 doc: None,
2277 r#type: Schema::Complex(ComplexType::Fixed(Fixed {
2278 name: "MD5",
2279 namespace: None,
2280 aliases: vec![],
2281 size: 16,
2282 attributes: Default::default(),
2283 })),
2284 default: None,
2285 aliases: vec![],
2286 },
2287 Field {
2288 name: "clientProtocol",
2289 doc: None,
2290 r#type: Schema::Union(vec![
2291 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2292 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2293 ]),
2294 default: None,
2295 aliases: vec![],
2296 },
2297 Field {
2298 name: "serverHash",
2299 doc: None,
2300 r#type: Schema::TypeName(TypeName::Ref("MD5")),
2301 default: None,
2302 aliases: vec![],
2303 },
2304 Field {
2305 name: "meta",
2306 doc: None,
2307 r#type: Schema::Union(vec![
2308 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2309 Schema::Complex(ComplexType::Map(Map {
2310 values: Box::new(Schema::TypeName(TypeName::Primitive(
2311 PrimitiveType::Bytes
2312 ))),
2313 attributes: Default::default(),
2314 })),
2315 ]),
2316 default: None,
2317 aliases: vec![],
2318 }
2319 ],
2320 attributes: Default::default(),
2321 }))
2322 );
2323 }
2324
2325 #[test]
2326 fn test_canonical_form_generation_comprehensive_record() {
2327 let json_str = r#"{
2329 "type": "record",
2330 "name": "E2eComprehensive",
2331 "namespace": "org.apache.arrow.avrotests.v1",
2332 "doc": "Comprehensive Avro writer schema to exercise arrow-avro Reader/Decoder paths.",
2333 "fields": [
2334 {"name": "id", "type": "long", "doc": "Primary row id", "aliases": ["identifier"]},
2335 {"name": "flag", "type": "boolean", "default": true, "doc": "A sample boolean with default true"},
2336 {"name": "ratio_f32", "type": "float", "default": 0.0, "doc": "Float32 example"},
2337 {"name": "ratio_f64", "type": "double", "default": 0.0, "doc": "Float64 example"},
2338 {"name": "count_i32", "type": "int", "default": 0, "doc": "Int32 example"},
2339 {"name": "count_i64", "type": "long", "default": 0, "doc": "Int64 example"},
2340 {"name": "opt_i32_nullfirst", "type": ["null", "int"], "default": null, "doc": "Nullable int (null-first)"},
2341 {"name": "opt_str_nullsecond", "type": ["string", "null"], "default": "", "aliases": ["old_opt_str"], "doc": "Nullable string (null-second). Default is empty string."},
2342 {"name": "tri_union_prim", "type": ["int", "string", "boolean"], "default": 0, "doc": "Union[int, string, boolean] with default on first branch (int=0)."},
2343 {"name": "str_utf8", "type": "string", "default": "default", "doc": "Plain Utf8 string (Reader may use Utf8View)."},
2344 {"name": "raw_bytes", "type": "bytes", "default": "", "doc": "Raw bytes field"},
2345 {"name": "fx16_plain", "type": {"type": "fixed", "name": "Fx16", "namespace": "org.apache.arrow.avrotests.v1.types", "aliases": ["Fixed16Old"], "size": 16}, "doc": "Plain fixed(16)"},
2346 {"name": "dec_bytes_s10_2", "type": {"type": "bytes", "logicalType": "decimal", "precision": 10, "scale": 2}, "doc": "Decimal encoded on bytes, precision 10, scale 2"},
2347 {"name": "dec_fix_s20_4", "type": {"type": "fixed", "name": "DecFix20", "namespace": "org.apache.arrow.avrotests.v1.types", "size": 20, "logicalType": "decimal", "precision": 20, "scale": 4}, "doc": "Decimal encoded on fixed(20), precision 20, scale 4"},
2348 {"name": "uuid_str", "type": {"type": "string", "logicalType": "uuid"}, "doc": "UUID logical type on string"},
2349 {"name": "d_date", "type": {"type": "int", "logicalType": "date"}, "doc": "Date32: days since 1970-01-01"},
2350 {"name": "t_millis", "type": {"type": "int", "logicalType": "time-millis"}, "doc": "Time32-millis"},
2351 {"name": "t_micros", "type": {"type": "long", "logicalType": "time-micros"}, "doc": "Time64-micros"},
2352 {"name": "ts_millis_utc", "type": {"type": "long", "logicalType": "timestamp-millis"}, "doc": "Timestamp ms (UTC)"},
2353 {"name": "ts_micros_utc", "type": {"type": "long", "logicalType": "timestamp-micros"}, "doc": "Timestamp µs (UTC)"},
2354 {"name": "ts_millis_local", "type": {"type": "long", "logicalType": "local-timestamp-millis"}, "doc": "Local timestamp ms"},
2355 {"name": "ts_micros_local", "type": {"type": "long", "logicalType": "local-timestamp-micros"}, "doc": "Local timestamp µs"},
2356 {"name": "interval_mdn", "type": {"type": "fixed", "name": "Dur12", "namespace": "org.apache.arrow.avrotests.v1.types", "size": 12, "logicalType": "duration"}, "doc": "Duration: fixed(12) little-endian (months, days, millis)"},
2357 {"name": "status", "type": {"type": "enum", "name": "Status", "namespace": "org.apache.arrow.avrotests.v1.types", "symbols": ["UNKNOWN", "NEW", "PROCESSING", "DONE"], "aliases": ["State"], "doc": "Processing status enum with default"}, "default": "UNKNOWN", "doc": "Enum field using default when resolving"},
2358 {"name": "arr_union", "type": {"type": "array", "items": ["long", "string", "null"]}, "default": [], "doc": "Array whose items are a union[long,string,null]"},
2359 {"name": "map_union", "type": {"type": "map", "values": ["null", "double", "string"]}, "default": {}, "doc": "Map whose values are a union[null,double,string]"},
2360 {"name": "address", "type": {"type": "record", "name": "Address", "namespace": "org.apache.arrow.avrotests.v1.types", "doc": "Postal address with defaults and field alias", "fields": [
2361 {"name": "street", "type": "string", "default": "", "aliases": ["street_name"], "doc": "Street (field alias = street_name)"},
2362 {"name": "zip", "type": "int", "default": 0, "doc": "ZIP/postal code"},
2363 {"name": "country", "type": "string", "default": "US", "doc": "Country code"}
2364 ]}, "doc": "Embedded Address record"},
2365 {"name": "maybe_auth", "type": {"type": "record", "name": "MaybeAuth", "namespace": "org.apache.arrow.avrotests.v1.types", "doc": "Optional auth token model", "fields": [
2366 {"name": "user", "type": "string", "doc": "Username"},
2367 {"name": "token", "type": ["null", "bytes"], "default": null, "doc": "Nullable auth token"}
2368 ]}},
2369 {"name": "union_enum_record_array_map", "type": [
2370 {"type": "enum", "name": "Color", "namespace": "org.apache.arrow.avrotests.v1.types", "symbols": ["RED", "GREEN", "BLUE"], "doc": "Color enum"},
2371 {"type": "record", "name": "RecA", "namespace": "org.apache.arrow.avrotests.v1.types", "fields": [{"name": "a", "type": "int"}, {"name": "b", "type": "string"}]},
2372 {"type": "record", "name": "RecB", "namespace": "org.apache.arrow.avrotests.v1.types", "fields": [{"name": "x", "type": "long"}, {"name": "y", "type": "bytes"}]},
2373 {"type": "array", "items": "long"},
2374 {"type": "map", "values": "string"}
2375 ], "doc": "Union of enum, two records, array, and map"},
2376 {"name": "union_date_or_fixed4", "type": [
2377 {"type": "int", "logicalType": "date"},
2378 {"type": "fixed", "name": "Fx4", "size": 4}
2379 ], "doc": "Union of date(int) or fixed(4)"},
2380 {"name": "union_interval_or_string", "type": [
2381 {"type": "fixed", "name": "Dur12U", "size": 12, "logicalType": "duration"},
2382 "string"
2383 ], "doc": "Union of duration(fixed12) or string"},
2384 {"name": "union_uuid_or_fixed10", "type": [
2385 {"type": "string", "logicalType": "uuid"},
2386 {"type": "fixed", "name": "Fx10", "size": 10}
2387 ], "doc": "Union of UUID string or fixed(10)"},
2388 {"name": "array_records_with_union", "type": {"type": "array", "items": {
2389 "type": "record", "name": "KV", "namespace": "org.apache.arrow.avrotests.v1.types",
2390 "fields": [
2391 {"name": "key", "type": "string"},
2392 {"name": "val", "type": ["null", "int", "long"], "default": null}
2393 ]
2394 }}, "doc": "Array<record{key, val: union[null,int,long]}>", "default": []},
2395 {"name": "union_map_or_array_int", "type": [
2396 {"type": "map", "values": "int"},
2397 {"type": "array", "items": "int"}
2398 ], "doc": "Union[map<string,int>, array<int>]"},
2399 {"name": "renamed_with_default", "type": "int", "default": 42, "aliases": ["old_count"], "doc": "Field with alias and default"},
2400 {"name": "person", "type": {"type": "record", "name": "PersonV2", "namespace": "com.example.v2", "aliases": ["com.example.Person"], "doc": "Person record with alias pointing to previous namespace/name", "fields": [
2401 {"name": "name", "type": "string"},
2402 {"name": "age", "type": "int", "default": 0}
2403 ]}, "doc": "Record using type alias for schema evolution tests"}
2404 ]
2405 }"#;
2406 let avro = AvroSchema::new(json_str.to_string());
2407 let parsed = avro.schema().expect("schema should deserialize");
2408 let expected_canonical_form = r#"{"name":"org.apache.arrow.avrotests.v1.E2eComprehensive","type":"record","fields":[{"name":"id","type":"long"},{"name":"flag","type":"boolean"},{"name":"ratio_f32","type":"float"},{"name":"ratio_f64","type":"double"},{"name":"count_i32","type":"int"},{"name":"count_i64","type":"long"},{"name":"opt_i32_nullfirst","type":["null","int"]},{"name":"opt_str_nullsecond","type":["string","null"]},{"name":"tri_union_prim","type":["int","string","boolean"]},{"name":"str_utf8","type":"string"},{"name":"raw_bytes","type":"bytes"},{"name":"fx16_plain","type":{"name":"org.apache.arrow.avrotests.v1.types.Fx16","type":"fixed","size":16}},{"name":"dec_bytes_s10_2","type":"bytes"},{"name":"dec_fix_s20_4","type":{"name":"org.apache.arrow.avrotests.v1.types.DecFix20","type":"fixed","size":20}},{"name":"uuid_str","type":"string"},{"name":"d_date","type":"int"},{"name":"t_millis","type":"int"},{"name":"t_micros","type":"long"},{"name":"ts_millis_utc","type":"long"},{"name":"ts_micros_utc","type":"long"},{"name":"ts_millis_local","type":"long"},{"name":"ts_micros_local","type":"long"},{"name":"interval_mdn","type":{"name":"org.apache.arrow.avrotests.v1.types.Dur12","type":"fixed","size":12}},{"name":"status","type":{"name":"org.apache.arrow.avrotests.v1.types.Status","type":"enum","symbols":["UNKNOWN","NEW","PROCESSING","DONE"]}},{"name":"arr_union","type":{"type":"array","items":["long","string","null"]}},{"name":"map_union","type":{"type":"map","values":["null","double","string"]}},{"name":"address","type":{"name":"org.apache.arrow.avrotests.v1.types.Address","type":"record","fields":[{"name":"street","type":"string"},{"name":"zip","type":"int"},{"name":"country","type":"string"}]}},{"name":"maybe_auth","type":{"name":"org.apache.arrow.avrotests.v1.types.MaybeAuth","type":"record","fields":[{"name":"user","type":"string"},{"name":"token","type":["null","bytes"]}]}},{"name":"union_enum_record_array_map","type":[{"name":"org.apache.arrow.avrotests.v1.types.Color","type":"enum","symbols":["RED","GREEN","BLUE"]},{"name":"org.apache.arrow.avrotests.v1.types.RecA","type":"record","fields":[{"name":"a","type":"int"},{"name":"b","type":"string"}]},{"name":"org.apache.arrow.avrotests.v1.types.RecB","type":"record","fields":[{"name":"x","type":"long"},{"name":"y","type":"bytes"}]},{"type":"array","items":"long"},{"type":"map","values":"string"}]},{"name":"union_date_or_fixed4","type":["int",{"name":"org.apache.arrow.avrotests.v1.Fx4","type":"fixed","size":4}]},{"name":"union_interval_or_string","type":[{"name":"org.apache.arrow.avrotests.v1.Dur12U","type":"fixed","size":12},"string"]},{"name":"union_uuid_or_fixed10","type":["string",{"name":"org.apache.arrow.avrotests.v1.Fx10","type":"fixed","size":10}]},{"name":"array_records_with_union","type":{"type":"array","items":{"name":"org.apache.arrow.avrotests.v1.types.KV","type":"record","fields":[{"name":"key","type":"string"},{"name":"val","type":["null","int","long"]}]}}},{"name":"union_map_or_array_int","type":[{"type":"map","values":"int"},{"type":"array","items":"int"}]},{"name":"renamed_with_default","type":"int"},{"name":"person","type":{"name":"com.example.v2.PersonV2","type":"record","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"}]}}]}"#;
2409 let canonical_form =
2410 AvroSchema::generate_canonical_form(&parsed).expect("canonical form should be built");
2411 assert_eq!(
2412 canonical_form, expected_canonical_form,
2413 "Canonical form must match Avro spec PCF exactly"
2414 );
2415 }
2416
2417 #[test]
2418 fn test_new_schema_store() {
2419 let store = SchemaStore::new();
2420 assert!(store.schemas.is_empty());
2421 }
2422
2423 #[test]
2424 fn test_try_from_schemas_rabin() {
2425 let int_avro_schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2426 let record_avro_schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2427 let mut schemas: HashMap<Fingerprint, AvroSchema> = HashMap::new();
2428 schemas.insert(
2429 int_avro_schema
2430 .fingerprint(FingerprintAlgorithm::Rabin)
2431 .unwrap(),
2432 int_avro_schema.clone(),
2433 );
2434 schemas.insert(
2435 record_avro_schema
2436 .fingerprint(FingerprintAlgorithm::Rabin)
2437 .unwrap(),
2438 record_avro_schema.clone(),
2439 );
2440 let store = SchemaStore::try_from(schemas).unwrap();
2441 let int_fp = int_avro_schema
2442 .fingerprint(FingerprintAlgorithm::Rabin)
2443 .unwrap();
2444 assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema));
2445 let rec_fp = record_avro_schema
2446 .fingerprint(FingerprintAlgorithm::Rabin)
2447 .unwrap();
2448 assert_eq!(store.lookup(&rec_fp).cloned(), Some(record_avro_schema));
2449 }
2450
2451 #[test]
2452 fn test_try_from_with_duplicates() {
2453 let int_avro_schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2454 let record_avro_schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2455 let mut schemas: HashMap<Fingerprint, AvroSchema> = HashMap::new();
2456 schemas.insert(
2457 int_avro_schema
2458 .fingerprint(FingerprintAlgorithm::Rabin)
2459 .unwrap(),
2460 int_avro_schema.clone(),
2461 );
2462 schemas.insert(
2463 record_avro_schema
2464 .fingerprint(FingerprintAlgorithm::Rabin)
2465 .unwrap(),
2466 record_avro_schema.clone(),
2467 );
2468 schemas.insert(
2470 int_avro_schema
2471 .fingerprint(FingerprintAlgorithm::Rabin)
2472 .unwrap(),
2473 int_avro_schema.clone(),
2474 );
2475 let store = SchemaStore::try_from(schemas).unwrap();
2476 assert_eq!(store.schemas.len(), 2);
2477 let int_fp = int_avro_schema
2478 .fingerprint(FingerprintAlgorithm::Rabin)
2479 .unwrap();
2480 assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema));
2481 }
2482
2483 #[test]
2484 fn test_register_and_lookup_rabin() {
2485 let mut store = SchemaStore::new();
2486 let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2487 let fp_enum = store.register(schema.clone()).unwrap();
2488 match fp_enum {
2489 Fingerprint::Rabin(fp_val) => {
2490 assert_eq!(
2491 store.lookup(&Fingerprint::Rabin(fp_val)).cloned(),
2492 Some(schema.clone())
2493 );
2494 assert!(
2495 store
2496 .lookup(&Fingerprint::Rabin(fp_val.wrapping_add(1)))
2497 .is_none()
2498 );
2499 }
2500 Fingerprint::Id(_id) => {
2501 unreachable!("This test should only generate Rabin fingerprints")
2502 }
2503 Fingerprint::Id64(_id) => {
2504 unreachable!("This test should only generate Rabin fingerprints")
2505 }
2506 #[cfg(feature = "md5")]
2507 Fingerprint::MD5(_id) => {
2508 unreachable!("This test should only generate Rabin fingerprints")
2509 }
2510 #[cfg(feature = "sha256")]
2511 Fingerprint::SHA256(_id) => {
2512 unreachable!("This test should only generate Rabin fingerprints")
2513 }
2514 }
2515 }
2516
2517 #[test]
2518 fn test_set_and_lookup_id() {
2519 let mut store = SchemaStore::new();
2520 let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2521 let id = 42u32;
2522 let fp = Fingerprint::Id(id);
2523 let out_fp = store.set(fp, schema.clone()).unwrap();
2524 assert_eq!(out_fp, fp);
2525 assert_eq!(store.lookup(&fp).cloned(), Some(schema.clone()));
2526 assert!(store.lookup(&Fingerprint::Id(id.wrapping_add(1))).is_none());
2527 }
2528
2529 #[test]
2530 fn test_set_and_lookup_id64() {
2531 let mut store = SchemaStore::new();
2532 let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2533 let id64: u64 = 0xDEAD_BEEF_DEAD_BEEF;
2534 let fp = Fingerprint::Id64(id64);
2535 let out_fp = store.set(fp, schema.clone()).unwrap();
2536 assert_eq!(out_fp, fp, "set should return the same Id64 fingerprint");
2537 assert_eq!(
2538 store.lookup(&fp).cloned(),
2539 Some(schema.clone()),
2540 "lookup should find the schema by Id64"
2541 );
2542 assert!(
2543 store
2544 .lookup(&Fingerprint::Id64(id64.wrapping_add(1)))
2545 .is_none(),
2546 "lookup with a different Id64 must return None"
2547 );
2548 }
2549
2550 #[test]
2551 fn test_fingerprint_id64_conversions() {
2552 let algo_from_fp = FingerprintAlgorithm::from(&Fingerprint::Id64(123));
2553 assert_eq!(algo_from_fp, FingerprintAlgorithm::Id64);
2554 let fp_from_algo = Fingerprint::from(FingerprintAlgorithm::Id64);
2555 assert!(matches!(fp_from_algo, Fingerprint::Id64(0)));
2556 let strategy_from_fp = FingerprintStrategy::from(Fingerprint::Id64(5));
2557 assert!(matches!(strategy_from_fp, FingerprintStrategy::Id64(0)));
2558 let algo_from_strategy = FingerprintAlgorithm::from(strategy_from_fp);
2559 assert_eq!(algo_from_strategy, FingerprintAlgorithm::Id64);
2560 }
2561
2562 #[test]
2563 fn test_register_duplicate_schema() {
2564 let mut store = SchemaStore::new();
2565 let schema1 = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2566 let schema2 = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2567 let fingerprint1 = store.register(schema1).unwrap();
2568 let fingerprint2 = store.register(schema2).unwrap();
2569 assert_eq!(fingerprint1, fingerprint2);
2570 assert_eq!(store.schemas.len(), 1);
2571 }
2572
2573 #[test]
2574 fn test_set_and_lookup_with_provided_fingerprint() {
2575 let mut store = SchemaStore::new();
2576 let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2577 let fp = schema.fingerprint(FingerprintAlgorithm::Rabin).unwrap();
2578 let out_fp = store.set(fp, schema.clone()).unwrap();
2579 assert_eq!(out_fp, fp);
2580 assert_eq!(store.lookup(&fp).cloned(), Some(schema));
2581 }
2582
2583 #[test]
2584 fn test_set_duplicate_same_schema_ok() {
2585 let mut store = SchemaStore::new();
2586 let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2587 let fp = schema.fingerprint(FingerprintAlgorithm::Rabin).unwrap();
2588 let _ = store.set(fp, schema.clone()).unwrap();
2589 let _ = store.set(fp, schema.clone()).unwrap();
2590 assert_eq!(store.schemas.len(), 1);
2591 }
2592
2593 #[test]
2594 fn test_set_duplicate_different_schema_collision_error() {
2595 let mut store = SchemaStore::new();
2596 let schema1 = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2597 let schema2 = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2598 let fp = Fingerprint::Id(123);
2600 let _ = store.set(fp, schema1).unwrap();
2601 let err = store.set(fp, schema2).unwrap_err();
2602 let msg = format!("{err}");
2603 assert!(msg.contains("Schema fingerprint collision"));
2604 }
2605
2606 #[test]
2607 fn test_canonical_form_generation_primitive() {
2608 let schema = int_schema();
2609 let canonical_form = AvroSchema::generate_canonical_form(&schema).unwrap();
2610 assert_eq!(canonical_form, r#""int""#);
2611 }
2612
2613 #[test]
2614 fn test_canonical_form_generation_record() {
2615 let schema = record_schema();
2616 let expected_canonical_form = r#"{"name":"test.namespace.record1","type":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"string"}]}"#;
2617 let canonical_form = AvroSchema::generate_canonical_form(&schema).unwrap();
2618 assert_eq!(canonical_form, expected_canonical_form);
2619 }
2620
2621 #[test]
2622 fn test_fingerprint_calculation() {
2623 let canonical_form = r#"{"fields":[{"name":"a","type":"long"},{"name":"b","type":"string"}],"name":"test","type":"record"}"#;
2624 let expected_fingerprint = 10505236152925314060;
2625 let fingerprint = compute_fingerprint_rabin(canonical_form);
2626 assert_eq!(fingerprint, expected_fingerprint);
2627 }
2628
2629 #[test]
2630 fn test_register_and_lookup_complex_schema() {
2631 let mut store = SchemaStore::new();
2632 let schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2633 let canonical_form = r#"{"name":"test.namespace.record1","type":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"string"}]}"#;
2634 let expected_fingerprint = Fingerprint::Rabin(compute_fingerprint_rabin(canonical_form));
2635 let fingerprint = store.register(schema.clone()).unwrap();
2636 assert_eq!(fingerprint, expected_fingerprint);
2637 let looked_up = store.lookup(&fingerprint).cloned();
2638 assert_eq!(looked_up, Some(schema));
2639 }
2640
2641 #[test]
2642 fn test_fingerprints_returns_all_keys() {
2643 let mut store = SchemaStore::new();
2644 let fp_int = store
2645 .register(AvroSchema::new(
2646 serde_json::to_string(&int_schema()).unwrap(),
2647 ))
2648 .unwrap();
2649 let fp_record = store
2650 .register(AvroSchema::new(
2651 serde_json::to_string(&record_schema()).unwrap(),
2652 ))
2653 .unwrap();
2654 let fps = store.fingerprints();
2655 assert_eq!(fps.len(), 2);
2656 assert!(fps.contains(&fp_int));
2657 assert!(fps.contains(&fp_record));
2658 }
2659
2660 #[test]
2661 fn test_canonical_form_strips_attributes() {
2662 let schema_with_attrs = Schema::Complex(ComplexType::Record(Record {
2663 name: "record_with_attrs",
2664 namespace: None,
2665 doc: Some(Cow::from("This doc should be stripped")),
2666 aliases: vec!["alias1", "alias2"],
2667 fields: vec![Field {
2668 name: "f1",
2669 doc: Some(Cow::from("field doc")),
2670 r#type: Schema::Type(Type {
2671 r#type: TypeName::Primitive(PrimitiveType::Bytes),
2672 attributes: Attributes {
2673 logical_type: None,
2674 additional: HashMap::from([("precision", json!(4))]),
2675 },
2676 }),
2677 default: None,
2678 aliases: vec![],
2679 }],
2680 attributes: Attributes {
2681 logical_type: None,
2682 additional: HashMap::from([("custom_attr", json!("value"))]),
2683 },
2684 }));
2685 let expected_canonical_form = r#"{"name":"record_with_attrs","type":"record","fields":[{"name":"f1","type":"bytes"}]}"#;
2686 let canonical_form = AvroSchema::generate_canonical_form(&schema_with_attrs).unwrap();
2687 assert_eq!(canonical_form, expected_canonical_form);
2688 }
2689
2690 #[cfg(not(feature = "avro_custom_types"))]
2691 #[test]
2692 fn test_primitive_mappings() {
2693 let cases = vec![
2694 (DataType::Boolean, "\"boolean\""),
2695 (DataType::Int8, "\"int\""),
2696 (DataType::Int16, "\"int\""),
2697 (DataType::Int32, "\"int\""),
2698 (DataType::Int64, "\"long\""),
2699 (DataType::UInt8, "\"int\""),
2700 (DataType::UInt16, "\"int\""),
2701 (DataType::UInt32, "\"long\""),
2702 (DataType::UInt64, "\"long\""),
2703 (DataType::Float16, "\"float\""),
2704 (DataType::Float32, "\"float\""),
2705 (DataType::Float64, "\"double\""),
2706 (DataType::Utf8, "\"string\""),
2707 (DataType::Binary, "\"bytes\""),
2708 ];
2709 for (dt, avro_token) in cases {
2710 let field = ArrowField::new("col", dt.clone(), false);
2711 let arrow_schema = single_field_schema(field);
2712 let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2713 assert_json_contains(&avro.json_string, avro_token);
2714 }
2715 }
2716
2717 #[cfg(feature = "avro_custom_types")]
2718 #[test]
2719 fn test_primitive_mappings() {
2720 let cases = vec![
2721 (DataType::Boolean, "\"boolean\""),
2722 (DataType::Int8, "\"logicalType\":\"arrow.int8\""),
2723 (DataType::Int16, "\"logicalType\":\"arrow.int16\""),
2724 (DataType::Int32, "\"int\""),
2725 (DataType::Int64, "\"long\""),
2726 (DataType::UInt8, "\"logicalType\":\"arrow.uint8\""),
2727 (DataType::UInt16, "\"logicalType\":\"arrow.uint16\""),
2728 (DataType::UInt32, "\"logicalType\":\"arrow.uint32\""),
2729 (DataType::UInt64, "\"logicalType\":\"arrow.uint64\""),
2730 (DataType::Float16, "\"logicalType\":\"arrow.float16\""),
2731 (DataType::Float32, "\"float\""),
2732 (DataType::Float64, "\"double\""),
2733 (DataType::Utf8, "\"string\""),
2734 (DataType::Binary, "\"bytes\""),
2735 ];
2736 for (dt, avro_token) in cases {
2737 let field = ArrowField::new("col", dt.clone(), false);
2738 let arrow_schema = single_field_schema(field);
2739 let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2740 assert_json_contains(&avro.json_string, avro_token);
2741 }
2742 }
2743
2744 #[cfg(feature = "avro_custom_types")]
2745 #[test]
2746 fn test_custom_fixed_logical_types_preserve_namespace_metadata() {
2747 let namespace = "com.example.types";
2748
2749 let mut md_u64 = HashMap::new();
2750 md_u64.insert(AVRO_NAME_METADATA_KEY.to_string(), "U64Type".to_string());
2751 md_u64.insert(
2752 AVRO_NAMESPACE_METADATA_KEY.to_string(),
2753 namespace.to_string(),
2754 );
2755
2756 let mut md_f16 = HashMap::new();
2757 md_f16.insert(AVRO_NAME_METADATA_KEY.to_string(), "F16Type".to_string());
2758 md_f16.insert(
2759 AVRO_NAMESPACE_METADATA_KEY.to_string(),
2760 namespace.to_string(),
2761 );
2762
2763 let mut md_iv_ym = HashMap::new();
2764 md_iv_ym.insert(AVRO_NAME_METADATA_KEY.to_string(), "IvYmType".to_string());
2765 md_iv_ym.insert(
2766 AVRO_NAMESPACE_METADATA_KEY.to_string(),
2767 namespace.to_string(),
2768 );
2769
2770 let mut md_iv_dt = HashMap::new();
2771 md_iv_dt.insert(AVRO_NAME_METADATA_KEY.to_string(), "IvDtType".to_string());
2772 md_iv_dt.insert(
2773 AVRO_NAMESPACE_METADATA_KEY.to_string(),
2774 namespace.to_string(),
2775 );
2776
2777 let arrow_schema = ArrowSchema::new(vec![
2778 ArrowField::new("u64_col", DataType::UInt64, false).with_metadata(md_u64),
2779 ArrowField::new("f16_col", DataType::Float16, false).with_metadata(md_f16),
2780 ArrowField::new(
2781 "iv_ym_col",
2782 DataType::Interval(IntervalUnit::YearMonth),
2783 false,
2784 )
2785 .with_metadata(md_iv_ym),
2786 ArrowField::new(
2787 "iv_dt_col",
2788 DataType::Interval(IntervalUnit::DayTime),
2789 false,
2790 )
2791 .with_metadata(md_iv_dt),
2792 ]);
2793
2794 let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2795 let root: Value = serde_json::from_str(&avro.json_string).unwrap();
2796 let fields = root
2797 .get("fields")
2798 .and_then(|f| f.as_array())
2799 .expect("record fields array");
2800
2801 let expected = [
2802 ("u64_col", "arrow.uint64"),
2803 ("f16_col", "arrow.float16"),
2804 ("iv_ym_col", "arrow.interval-year-month"),
2805 ("iv_dt_col", "arrow.interval-day-time"),
2806 ];
2807
2808 for (field_name, logical_type) in expected {
2809 let field = fields
2810 .iter()
2811 .find(|f| f.get("name").and_then(Value::as_str) == Some(field_name))
2812 .unwrap_or_else(|| panic!("missing field {field_name}"));
2813 let ty = field
2814 .get("type")
2815 .and_then(Value::as_object)
2816 .unwrap_or_else(|| panic!("field {field_name} type must be object"));
2817
2818 assert_eq!(ty.get("type").and_then(Value::as_str), Some("fixed"));
2819 assert_eq!(
2820 ty.get("logicalType").and_then(Value::as_str),
2821 Some(logical_type)
2822 );
2823 assert_eq!(
2824 ty.get("namespace").and_then(Value::as_str),
2825 Some(namespace),
2826 "field {field_name} must preserve avro.namespace metadata"
2827 );
2828 }
2829 }
2830
2831 #[cfg(feature = "avro_custom_types")]
2832 #[test]
2833 fn test_custom_fixed_logical_types_omit_namespace_without_metadata() {
2834 let mut md_u64 = HashMap::new();
2835 md_u64.insert(AVRO_NAME_METADATA_KEY.to_string(), "U64Type".to_string());
2836
2837 let mut md_f16 = HashMap::new();
2838 md_f16.insert(AVRO_NAME_METADATA_KEY.to_string(), "F16Type".to_string());
2839
2840 let mut md_iv_ym = HashMap::new();
2841 md_iv_ym.insert(AVRO_NAME_METADATA_KEY.to_string(), "IvYmType".to_string());
2842
2843 let mut md_iv_dt = HashMap::new();
2844 md_iv_dt.insert(AVRO_NAME_METADATA_KEY.to_string(), "IvDtType".to_string());
2845
2846 let arrow_schema = ArrowSchema::new(vec![
2847 ArrowField::new("u64_col", DataType::UInt64, false).with_metadata(md_u64),
2848 ArrowField::new("f16_col", DataType::Float16, false).with_metadata(md_f16),
2849 ArrowField::new(
2850 "iv_ym_col",
2851 DataType::Interval(IntervalUnit::YearMonth),
2852 false,
2853 )
2854 .with_metadata(md_iv_ym),
2855 ArrowField::new(
2856 "iv_dt_col",
2857 DataType::Interval(IntervalUnit::DayTime),
2858 false,
2859 )
2860 .with_metadata(md_iv_dt),
2861 ]);
2862
2863 let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2864 let root: Value = serde_json::from_str(&avro.json_string).unwrap();
2865 let fields = root
2866 .get("fields")
2867 .and_then(|f| f.as_array())
2868 .expect("record fields array");
2869
2870 for field_name in ["u64_col", "f16_col", "iv_ym_col", "iv_dt_col"] {
2871 let field = fields
2872 .iter()
2873 .find(|f| f.get("name").and_then(Value::as_str) == Some(field_name))
2874 .unwrap_or_else(|| panic!("missing field {field_name}"));
2875 let ty = field
2876 .get("type")
2877 .and_then(Value::as_object)
2878 .unwrap_or_else(|| panic!("field {field_name} type must be object"));
2879
2880 assert_eq!(ty.get("type").and_then(Value::as_str), Some("fixed"));
2881 assert!(
2882 !ty.contains_key("namespace"),
2883 "field {field_name} should not include namespace when metadata lacks avro.namespace"
2884 );
2885 }
2886 }
2887
2888 #[test]
2889 fn test_temporal_mappings() {
2890 let cases = vec![
2891 (DataType::Date32, "\"logicalType\":\"date\""),
2892 (
2893 DataType::Time32(TimeUnit::Millisecond),
2894 "\"logicalType\":\"time-millis\"",
2895 ),
2896 (
2897 DataType::Time64(TimeUnit::Microsecond),
2898 "\"logicalType\":\"time-micros\"",
2899 ),
2900 (
2901 DataType::Timestamp(TimeUnit::Millisecond, None),
2902 "\"logicalType\":\"local-timestamp-millis\"",
2903 ),
2904 (
2905 DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
2906 "\"logicalType\":\"timestamp-micros\"",
2907 ),
2908 ];
2909 for (dt, needle) in cases {
2910 let field = ArrowField::new("ts", dt.clone(), true);
2911 let arrow_schema = single_field_schema(field);
2912 let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2913 assert_json_contains(&avro.json_string, needle);
2914 }
2915 }
2916
2917 #[test]
2918 fn test_decimal_and_uuid() {
2919 let decimal_field = ArrowField::new("amount", DataType::Decimal128(25, 2), false);
2920 let dec_schema = single_field_schema(decimal_field);
2921 let avro_dec = AvroSchema::try_from(&dec_schema).unwrap();
2922 assert_json_contains(&avro_dec.json_string, "\"logicalType\":\"decimal\"");
2923 assert_json_contains(&avro_dec.json_string, "\"precision\":25");
2924 assert_json_contains(&avro_dec.json_string, "\"scale\":2");
2925 let mut md = HashMap::new();
2926 md.insert("logicalType".into(), "uuid".into());
2927 let uuid_field =
2928 ArrowField::new("id", DataType::FixedSizeBinary(16), false).with_metadata(md);
2929 let uuid_schema = single_field_schema(uuid_field);
2930 let avro_uuid = AvroSchema::try_from(&uuid_schema).unwrap();
2931 assert_json_contains(&avro_uuid.json_string, "\"logicalType\":\"uuid\"");
2932 }
2933
2934 #[cfg(not(feature = "avro_custom_types"))]
2935 #[test]
2936 fn test_interval_month_day_nano_duration_schema() {
2937 let interval_field = ArrowField::new(
2938 "span",
2939 DataType::Interval(IntervalUnit::MonthDayNano),
2940 false,
2941 );
2942 let s = single_field_schema(interval_field);
2943 let avro = AvroSchema::try_from(&s).unwrap();
2944 assert_json_contains(&avro.json_string, "\"logicalType\":\"duration\"");
2945 assert_json_contains(&avro.json_string, "\"size\":12");
2946 }
2947
2948 #[cfg(feature = "avro_custom_types")]
2949 #[test]
2950 fn test_interval_month_day_nano_custom_schema() {
2951 let interval_field = ArrowField::new(
2952 "span",
2953 DataType::Interval(IntervalUnit::MonthDayNano),
2954 false,
2955 );
2956 let s = single_field_schema(interval_field);
2957 let avro = AvroSchema::try_from(&s).unwrap();
2958 assert_json_contains(
2959 &avro.json_string,
2960 "\"logicalType\":\"arrow.interval-month-day-nano\"",
2961 );
2962 assert_json_contains(&avro.json_string, "\"size\":16");
2963 }
2964
2965 #[cfg(feature = "avro_custom_types")]
2966 #[test]
2967 fn test_duration_custom_logical_type() {
2968 let dur_field = ArrowField::new("latency", DataType::Duration(TimeUnit::Nanosecond), false);
2969 let s2 = single_field_schema(dur_field);
2970 let avro2 = AvroSchema::try_from(&s2).unwrap();
2971 assert_json_contains(
2972 &avro2.json_string,
2973 "\"logicalType\":\"arrow.duration-nanos\"",
2974 );
2975 }
2976
2977 #[test]
2978 fn test_complex_types() {
2979 let list_dt = DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true)));
2980 let list_schema = single_field_schema(ArrowField::new("numbers", list_dt, false));
2981 let avro_list = AvroSchema::try_from(&list_schema).unwrap();
2982 assert_json_contains(&avro_list.json_string, "\"type\":\"array\"");
2983 assert_json_contains(&avro_list.json_string, "\"items\"");
2984 let value_field = ArrowField::new("value", DataType::Boolean, true);
2985 let entries_struct = ArrowField::new(
2986 "entries",
2987 DataType::Struct(Fields::from(vec![
2988 ArrowField::new("key", DataType::Utf8, false),
2989 value_field.clone(),
2990 ])),
2991 false,
2992 );
2993 let map_dt = DataType::Map(Arc::new(entries_struct), false);
2994 let map_schema = single_field_schema(ArrowField::new("props", map_dt, false));
2995 let avro_map = AvroSchema::try_from(&map_schema).unwrap();
2996 assert_json_contains(&avro_map.json_string, "\"type\":\"map\"");
2997 assert_json_contains(&avro_map.json_string, "\"values\"");
2998 let struct_dt = DataType::Struct(Fields::from(vec![
2999 ArrowField::new("f1", DataType::Int64, false),
3000 ArrowField::new("f2", DataType::Utf8, true),
3001 ]));
3002 let struct_schema = single_field_schema(ArrowField::new("person", struct_dt, true));
3003 let avro_struct = AvroSchema::try_from(&struct_schema).unwrap();
3004 assert_json_contains(&avro_struct.json_string, "\"type\":\"record\"");
3005 assert_json_contains(&avro_struct.json_string, "\"null\"");
3006 }
3007
3008 #[test]
3009 fn test_enum_dictionary() {
3010 let mut md = HashMap::new();
3011 md.insert(
3012 AVRO_ENUM_SYMBOLS_METADATA_KEY.into(),
3013 "[\"OPEN\",\"CLOSED\"]".into(),
3014 );
3015 let enum_dt = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
3016 let field = ArrowField::new("status", enum_dt, false).with_metadata(md);
3017 let schema = single_field_schema(field);
3018 let avro = AvroSchema::try_from(&schema).unwrap();
3019 assert_json_contains(&avro.json_string, "\"type\":\"enum\"");
3020 assert_json_contains(&avro.json_string, "\"symbols\":[\"OPEN\",\"CLOSED\"]");
3021 }
3022
3023 #[test]
3024 fn test_run_end_encoded() {
3025 let ree_dt = DataType::RunEndEncoded(
3026 Arc::new(ArrowField::new("run_ends", DataType::Int32, false)),
3027 Arc::new(ArrowField::new("values", DataType::Utf8, false)),
3028 );
3029 let s = single_field_schema(ArrowField::new("text", ree_dt, false));
3030 let avro = AvroSchema::try_from(&s).unwrap();
3031 assert_json_contains(&avro.json_string, "\"string\"");
3032 }
3033
3034 #[test]
3035 fn test_dense_union() {
3036 let uf: UnionFields = vec![
3037 (2i8, Arc::new(ArrowField::new("a", DataType::Int32, false))),
3038 (7i8, Arc::new(ArrowField::new("b", DataType::Utf8, true))),
3039 ]
3040 .into_iter()
3041 .collect();
3042 let union_dt = DataType::Union(uf, UnionMode::Dense);
3043 let s = single_field_schema(ArrowField::new("u", union_dt, false));
3044 let avro =
3045 AvroSchema::try_from(&s).expect("Arrow Union -> Avro union conversion should succeed");
3046 let v: serde_json::Value = serde_json::from_str(&avro.json_string).unwrap();
3047 let fields = v
3048 .get("fields")
3049 .and_then(|x| x.as_array())
3050 .expect("fields array");
3051 let u_field = fields
3052 .iter()
3053 .find(|f| f.get("name").and_then(|n| n.as_str()) == Some("u"))
3054 .expect("field 'u'");
3055 let union = u_field.get("type").expect("u.type");
3056 let arr = union.as_array().expect("u.type must be Avro union array");
3057 assert_eq!(arr.len(), 2, "expected two union branches");
3058 let first = &arr[0];
3059 let obj = first
3060 .as_object()
3061 .expect("first branch should be an object with metadata");
3062 assert_eq!(obj.get("type").and_then(|t| t.as_str()), Some("int"));
3063 assert_eq!(
3064 obj.get("arrowUnionMode").and_then(|m| m.as_str()),
3065 Some("dense")
3066 );
3067 let type_ids: Vec<i64> = obj
3068 .get("arrowUnionTypeIds")
3069 .and_then(|a| a.as_array())
3070 .expect("arrowUnionTypeIds array")
3071 .iter()
3072 .map(|n| n.as_i64().expect("i64"))
3073 .collect();
3074 assert_eq!(type_ids, vec![2, 7], "type id ordering should be preserved");
3075 assert_eq!(arr[1], Value::String("string".into()));
3076 }
3077
3078 #[test]
3079 fn round_trip_primitive() {
3080 let arrow_schema = ArrowSchema::new(vec![ArrowField::new("f1", DataType::Int32, false)]);
3081 let avro_schema = AvroSchema::try_from(&arrow_schema).unwrap();
3082 let decoded = avro_schema.schema().unwrap();
3083 assert!(matches!(decoded, Schema::Complex(_)));
3084 }
3085
3086 #[test]
3087 fn test_name_generator_sanitization_and_uniqueness() {
3088 let f1 = ArrowField::new("weird-name", DataType::FixedSizeBinary(8), false);
3089 let f2 = ArrowField::new("weird name", DataType::FixedSizeBinary(8), false);
3090 let f3 = ArrowField::new("123bad", DataType::FixedSizeBinary(8), false);
3091 let arrow_schema = ArrowSchema::new(vec![f1, f2, f3]);
3092 let avro = AvroSchema::try_from(&arrow_schema).unwrap();
3093 assert_json_contains(&avro.json_string, "\"name\":\"weird_name\"");
3094 assert_json_contains(&avro.json_string, "\"name\":\"weird_name_1\"");
3095 assert_json_contains(&avro.json_string, "\"name\":\"_123bad\"");
3096 }
3097
3098 #[cfg(not(feature = "avro_custom_types"))]
3099 #[test]
3100 fn test_date64_logical_type_mapping() {
3101 let field = ArrowField::new("d", DataType::Date64, true);
3102 let schema = single_field_schema(field);
3103 let avro = AvroSchema::try_from(&schema).unwrap();
3104 assert_json_contains(
3105 &avro.json_string,
3106 "\"logicalType\":\"local-timestamp-millis\"",
3107 );
3108 }
3109
3110 #[cfg(feature = "avro_custom_types")]
3111 #[test]
3112 fn test_date64_logical_type_mapping_custom() {
3113 let field = ArrowField::new("d", DataType::Date64, true);
3114 let schema = single_field_schema(field);
3115 let avro = AvroSchema::try_from(&schema).unwrap();
3116 assert_json_contains(&avro.json_string, "\"logicalType\":\"arrow.date64\"");
3117 }
3118
3119 #[cfg(feature = "avro_custom_types")]
3120 #[test]
3121 fn test_duration_list_extras_propagated() {
3122 let child = ArrowField::new("lat", DataType::Duration(TimeUnit::Microsecond), false);
3123 let list_dt = DataType::List(Arc::new(child));
3124 let arrow_schema = single_field_schema(ArrowField::new("durations", list_dt, false));
3125 let avro = AvroSchema::try_from(&arrow_schema).unwrap();
3126 assert_json_contains(
3127 &avro.json_string,
3128 "\"logicalType\":\"arrow.duration-micros\"",
3129 );
3130 }
3131
3132 #[cfg(not(feature = "avro_custom_types"))]
3133 #[test]
3134 fn test_interval_yearmonth_extra() {
3135 let field = ArrowField::new("iv", DataType::Interval(IntervalUnit::YearMonth), false);
3136 let schema = single_field_schema(field);
3137 let avro = AvroSchema::try_from(&schema).unwrap();
3138 assert_json_contains(&avro.json_string, "\"arrowIntervalUnit\":\"yearmonth\"");
3139 }
3140
3141 #[cfg(not(feature = "avro_custom_types"))]
3142 #[test]
3143 fn test_interval_daytime_extra() {
3144 let field = ArrowField::new("iv_dt", DataType::Interval(IntervalUnit::DayTime), false);
3145 let schema = single_field_schema(field);
3146 let avro = AvroSchema::try_from(&schema).unwrap();
3147 assert_json_contains(&avro.json_string, "\"arrowIntervalUnit\":\"daytime\"");
3148 }
3149
3150 #[cfg(feature = "avro_custom_types")]
3151 #[test]
3152 fn test_interval_yearmonth_custom() {
3153 let field = ArrowField::new("iv", DataType::Interval(IntervalUnit::YearMonth), false);
3154 let schema = single_field_schema(field);
3155 let avro = AvroSchema::try_from(&schema).unwrap();
3156 assert_json_contains(
3157 &avro.json_string,
3158 "\"logicalType\":\"arrow.interval-year-month\"",
3159 );
3160 }
3161
3162 #[cfg(feature = "avro_custom_types")]
3163 #[test]
3164 fn test_interval_daytime_custom() {
3165 let field = ArrowField::new("iv_dt", DataType::Interval(IntervalUnit::DayTime), false);
3166 let schema = single_field_schema(field);
3167 let avro = AvroSchema::try_from(&schema).unwrap();
3168 assert_json_contains(
3169 &avro.json_string,
3170 "\"logicalType\":\"arrow.interval-day-time\"",
3171 );
3172 }
3173
3174 #[test]
3175 fn test_fixed_size_list_extra() {
3176 let child = ArrowField::new("item", DataType::Int32, false);
3177 let dt = DataType::FixedSizeList(Arc::new(child), 3);
3178 let schema = single_field_schema(ArrowField::new("triples", dt, false));
3179 let avro = AvroSchema::try_from(&schema).unwrap();
3180 assert_json_contains(&avro.json_string, "\"arrowFixedSize\":3");
3181 }
3182
3183 #[cfg(feature = "avro_custom_types")]
3184 #[test]
3185 fn test_map_duration_value_extra() {
3186 let val_field = ArrowField::new("value", DataType::Duration(TimeUnit::Second), true);
3187 let entries_struct = ArrowField::new(
3188 "entries",
3189 DataType::Struct(Fields::from(vec![
3190 ArrowField::new("key", DataType::Utf8, false),
3191 val_field,
3192 ])),
3193 false,
3194 );
3195 let map_dt = DataType::Map(Arc::new(entries_struct), false);
3196 let schema = single_field_schema(ArrowField::new("metrics", map_dt, false));
3197 let avro = AvroSchema::try_from(&schema).unwrap();
3198 assert_json_contains(
3199 &avro.json_string,
3200 "\"logicalType\":\"arrow.duration-seconds\"",
3201 );
3202 }
3203
3204 #[test]
3205 fn test_schema_with_non_string_defaults_decodes_successfully() {
3206 let schema_json = r#"{
3207 "type": "record",
3208 "name": "R",
3209 "fields": [
3210 {"name": "a", "type": "int", "default": 0},
3211 {"name": "b", "type": {"type": "array", "items": "long"}, "default": [1, 2, 3]},
3212 {"name": "c", "type": {"type": "map", "values": "double"}, "default": {"x": 1.5, "y": 2.5}},
3213 {"name": "inner", "type": {"type": "record", "name": "Inner", "fields": [
3214 {"name": "flag", "type": "boolean", "default": true},
3215 {"name": "name", "type": "string", "default": "hi"}
3216 ]}, "default": {"flag": false, "name": "d"}},
3217 {"name": "u", "type": ["int", "null"], "default": 42}
3218 ]
3219 }"#;
3220 let schema: Schema = serde_json::from_str(schema_json).expect("schema should parse");
3221 match &schema {
3222 Schema::Complex(ComplexType::Record(_)) => {}
3223 other => panic!("expected record schema, got: {:?}", other),
3224 }
3225 let field = crate::codec::AvroField::try_from(&schema)
3227 .expect("Avro->Arrow conversion should succeed");
3228 let arrow_field = field.field();
3229 let expected_list_item = ArrowField::new(
3231 arrow_schema::Field::LIST_FIELD_DEFAULT_NAME,
3232 DataType::Int64,
3233 false,
3234 );
3235 let expected_b = ArrowField::new("b", DataType::List(Arc::new(expected_list_item)), false);
3236
3237 let expected_map_value = ArrowField::new("value", DataType::Float64, false);
3238 let expected_entries = ArrowField::new(
3239 "entries",
3240 DataType::Struct(Fields::from(vec![
3241 ArrowField::new("key", DataType::Utf8, false),
3242 expected_map_value,
3243 ])),
3244 false,
3245 );
3246 let expected_c =
3247 ArrowField::new("c", DataType::Map(Arc::new(expected_entries), false), false);
3248 let mut inner_md = std::collections::HashMap::new();
3249 inner_md.insert(AVRO_NAME_METADATA_KEY.to_string(), "Inner".to_string());
3250 let expected_inner = ArrowField::new(
3251 "inner",
3252 DataType::Struct(Fields::from(vec![
3253 ArrowField::new("flag", DataType::Boolean, false),
3254 ArrowField::new("name", DataType::Utf8, false),
3255 ])),
3256 false,
3257 )
3258 .with_metadata(inner_md);
3259 let mut root_md = std::collections::HashMap::new();
3260 root_md.insert(AVRO_NAME_METADATA_KEY.to_string(), "R".to_string());
3261 let expected = ArrowField::new(
3262 "R",
3263 DataType::Struct(Fields::from(vec![
3264 ArrowField::new("a", DataType::Int32, false),
3265 expected_b,
3266 expected_c,
3267 expected_inner,
3268 ArrowField::new("u", DataType::Int32, true),
3269 ])),
3270 false,
3271 )
3272 .with_metadata(root_md);
3273 assert_eq!(arrow_field, expected);
3274 }
3275
3276 #[test]
3277 fn default_order_is_consistent() {
3278 let arrow_schema = ArrowSchema::new(vec![ArrowField::new("s", DataType::Utf8, true)]);
3279 let a = AvroSchema::try_from(&arrow_schema).unwrap().json_string;
3280 let b = AvroSchema::from_arrow_with_options(&arrow_schema, None);
3281 assert_eq!(a, b.unwrap().json_string);
3282 }
3283
3284 #[test]
3285 fn test_union_branch_missing_name_errors() {
3286 for t in ["record", "enum", "fixed"] {
3287 let branch = json!({ "type": t });
3288 let err = union_branch_signature(&branch).unwrap_err().to_string();
3289 assert!(
3290 err.contains(&format!("Union branch '{t}' missing required 'name'")),
3291 "expected missing-name error for {t}, got: {err}"
3292 );
3293 }
3294 }
3295
3296 #[test]
3297 fn test_union_branch_named_type_signature_includes_name() {
3298 let rec = json!({ "type": "record", "name": "Foo" });
3299 assert_eq!(union_branch_signature(&rec).unwrap(), "N:record:Foo");
3300 let en = json!({ "type": "enum", "name": "Color", "symbols": ["R", "G", "B"] });
3301 assert_eq!(union_branch_signature(&en).unwrap(), "N:enum:Color");
3302 let fx = json!({ "type": "fixed", "name": "Bytes16", "size": 16 });
3303 assert_eq!(union_branch_signature(&fx).unwrap(), "N:fixed:Bytes16");
3304 }
3305
3306 #[test]
3307 fn test_record_field_alias_resolution_without_default() {
3308 let writer_json = r#"{
3309 "type":"record",
3310 "name":"R",
3311 "fields":[{"name":"old","type":"int"}]
3312 }"#;
3313 let reader_json = r#"{
3314 "type":"record",
3315 "name":"R",
3316 "fields":[{"name":"new","aliases":["old"],"type":"int"}]
3317 }"#;
3318 let writer: Schema = serde_json::from_str(writer_json).unwrap();
3319 let reader: Schema = serde_json::from_str(reader_json).unwrap();
3320 let resolved = AvroFieldBuilder::new(&writer)
3321 .with_reader_schema(&reader)
3322 .with_utf8view(false)
3323 .with_strict_mode(false)
3324 .build()
3325 .unwrap();
3326 let expected = ArrowField::new(
3327 "R",
3328 DataType::Struct(Fields::from(vec![ArrowField::new(
3329 "new",
3330 DataType::Int32,
3331 false,
3332 )])),
3333 false,
3334 );
3335 assert_eq!(resolved.field(), expected);
3336 }
3337
3338 #[test]
3339 fn test_record_field_alias_ambiguous_in_strict_mode_errors() {
3340 let writer_json = r#"{
3341 "type":"record",
3342 "name":"R",
3343 "fields":[
3344 {"name":"a","type":"int","aliases":["old"]},
3345 {"name":"b","type":"int","aliases":["old"]}
3346 ]
3347 }"#;
3348 let reader_json = r#"{
3349 "type":"record",
3350 "name":"R",
3351 "fields":[{"name":"target","type":"int","aliases":["old"]}]
3352 }"#;
3353 let writer: Schema = serde_json::from_str(writer_json).unwrap();
3354 let reader: Schema = serde_json::from_str(reader_json).unwrap();
3355 let err = AvroFieldBuilder::new(&writer)
3356 .with_reader_schema(&reader)
3357 .with_utf8view(false)
3358 .with_strict_mode(true)
3359 .build()
3360 .unwrap_err()
3361 .to_string();
3362 assert!(
3363 err.contains("Ambiguous alias 'old'"),
3364 "expected ambiguous-alias error, got: {err}"
3365 );
3366 }
3367
3368 #[test]
3369 fn test_pragmatic_writer_field_alias_mapping_non_strict() {
3370 let writer_json = r#"{
3371 "type":"record",
3372 "name":"R",
3373 "fields":[{"name":"before","type":"int","aliases":["now"]}]
3374 }"#;
3375 let reader_json = r#"{
3376 "type":"record",
3377 "name":"R",
3378 "fields":[{"name":"now","type":"int"}]
3379 }"#;
3380 let writer: Schema = serde_json::from_str(writer_json).unwrap();
3381 let reader: Schema = serde_json::from_str(reader_json).unwrap();
3382 let resolved = AvroFieldBuilder::new(&writer)
3383 .with_reader_schema(&reader)
3384 .with_utf8view(false)
3385 .with_strict_mode(false)
3386 .build()
3387 .unwrap();
3388 let expected = ArrowField::new(
3389 "R",
3390 DataType::Struct(Fields::from(vec![ArrowField::new(
3391 "now",
3392 DataType::Int32,
3393 false,
3394 )])),
3395 false,
3396 );
3397 assert_eq!(resolved.field(), expected);
3398 }
3399
3400 #[test]
3401 fn test_missing_reader_field_null_first_no_default_is_ok() {
3402 let writer_json = r#"{
3403 "type":"record",
3404 "name":"R",
3405 "fields":[{"name":"a","type":"int"}]
3406 }"#;
3407 let reader_json = r#"{
3408 "type":"record",
3409 "name":"R",
3410 "fields":[
3411 {"name":"a","type":"int"},
3412 {"name":"b","type":["null","int"]}
3413 ]
3414 }"#;
3415 let writer: Schema = serde_json::from_str(writer_json).unwrap();
3416 let reader: Schema = serde_json::from_str(reader_json).unwrap();
3417 let resolved = AvroFieldBuilder::new(&writer)
3418 .with_reader_schema(&reader)
3419 .with_utf8view(false)
3420 .with_strict_mode(false)
3421 .build()
3422 .unwrap();
3423 let expected = ArrowField::new(
3424 "R",
3425 DataType::Struct(Fields::from(vec![
3426 ArrowField::new("a", DataType::Int32, false),
3427 ArrowField::new("b", DataType::Int32, true).with_metadata(HashMap::from([(
3428 AVRO_FIELD_DEFAULT_METADATA_KEY.to_string(),
3429 "null".to_string(),
3430 )])),
3431 ])),
3432 false,
3433 );
3434 assert_eq!(resolved.field(), expected);
3435 }
3436
3437 #[test]
3438 fn test_missing_reader_field_null_second_without_default_errors() {
3439 let writer_json = r#"{
3440 "type":"record",
3441 "name":"R",
3442 "fields":[{"name":"a","type":"int"}]
3443 }"#;
3444 let reader_json = r#"{
3445 "type":"record",
3446 "name":"R",
3447 "fields":[
3448 {"name":"a","type":"int"},
3449 {"name":"b","type":["int","null"]}
3450 ]
3451 }"#;
3452 let writer: Schema = serde_json::from_str(writer_json).unwrap();
3453 let reader: Schema = serde_json::from_str(reader_json).unwrap();
3454 let err = AvroFieldBuilder::new(&writer)
3455 .with_reader_schema(&reader)
3456 .with_utf8view(false)
3457 .with_strict_mode(false)
3458 .build()
3459 .unwrap_err()
3460 .to_string();
3461 assert!(
3462 err.contains("must have a default value"),
3463 "expected missing-default error, got: {err}"
3464 );
3465 }
3466
3467 #[test]
3468 fn test_from_arrow_with_options_respects_schema_metadata_when_not_stripping() {
3469 let field = ArrowField::new("x", DataType::Int32, true);
3470 let injected_json =
3471 r#"{"type":"record","name":"Injected","fields":[{"name":"ignored","type":"int"}]}"#
3472 .to_string();
3473 let mut md = HashMap::new();
3474 md.insert(SCHEMA_METADATA_KEY.to_string(), injected_json.clone());
3475 md.insert("custom".to_string(), "123".to_string());
3476 let arrow_schema = ArrowSchema::new_with_metadata(vec![field], md);
3477 let opts = AvroSchemaOptions {
3478 null_order: Some(Nullability::NullSecond),
3479 strip_metadata: false,
3480 };
3481 let out = AvroSchema::from_arrow_with_options(&arrow_schema, Some(opts)).unwrap();
3482 assert_eq!(
3483 out.json_string, injected_json,
3484 "When strip_metadata=false and avro.schema is present, return the embedded JSON verbatim"
3485 );
3486 let v: Value = serde_json::from_str(&out.json_string).unwrap();
3487 assert_eq!(v.get("type").and_then(|t| t.as_str()), Some("record"));
3488 assert_eq!(v.get("name").and_then(|n| n.as_str()), Some("Injected"));
3489 }
3490
3491 #[test]
3492 fn test_from_arrow_with_options_ignores_schema_metadata_when_stripping_and_keeps_passthrough() {
3493 let field = ArrowField::new("x", DataType::Int32, true);
3494 let injected_json =
3495 r#"{"type":"record","name":"Injected","fields":[{"name":"ignored","type":"int"}]}"#
3496 .to_string();
3497 let mut md = HashMap::new();
3498 md.insert(SCHEMA_METADATA_KEY.to_string(), injected_json);
3499 md.insert("custom_meta".to_string(), "7".to_string());
3500 let arrow_schema = ArrowSchema::new_with_metadata(vec![field], md);
3501 let opts = AvroSchemaOptions {
3502 null_order: Some(Nullability::NullFirst),
3503 strip_metadata: true,
3504 };
3505 let out = AvroSchema::from_arrow_with_options(&arrow_schema, Some(opts)).unwrap();
3506 assert_json_contains(&out.json_string, "\"type\":\"record\"");
3507 assert_json_contains(&out.json_string, "\"name\":\"topLevelRecord\"");
3508 assert_json_contains(&out.json_string, "\"custom_meta\":7");
3509 }
3510
3511 #[test]
3512 fn test_from_arrow_with_options_null_first_for_nullable_primitive() {
3513 let field = ArrowField::new("s", DataType::Utf8, true);
3514 let arrow_schema = single_field_schema(field);
3515 let opts = AvroSchemaOptions {
3516 null_order: Some(Nullability::NullFirst),
3517 strip_metadata: true,
3518 };
3519 let out = AvroSchema::from_arrow_with_options(&arrow_schema, Some(opts)).unwrap();
3520 let v: Value = serde_json::from_str(&out.json_string).unwrap();
3521 let arr = v["fields"][0]["type"]
3522 .as_array()
3523 .expect("nullable primitive should be Avro union array");
3524 assert_eq!(arr[0], Value::String("null".into()));
3525 assert_eq!(arr[1], Value::String("string".into()));
3526 }
3527
3528 #[test]
3529 fn test_from_arrow_with_options_null_second_for_nullable_primitive() {
3530 let field = ArrowField::new("s", DataType::Utf8, true);
3531 let arrow_schema = single_field_schema(field);
3532 let opts = AvroSchemaOptions {
3533 null_order: Some(Nullability::NullSecond),
3534 strip_metadata: true,
3535 };
3536 let out = AvroSchema::from_arrow_with_options(&arrow_schema, Some(opts)).unwrap();
3537 let v: Value = serde_json::from_str(&out.json_string).unwrap();
3538 let arr = v["fields"][0]["type"]
3539 .as_array()
3540 .expect("nullable primitive should be Avro union array");
3541 assert_eq!(arr[0], Value::String("string".into()));
3542 assert_eq!(arr[1], Value::String("null".into()));
3543 }
3544
3545 #[test]
3546 fn test_from_arrow_with_options_union_extras_respected_by_strip_metadata() {
3547 let uf: UnionFields = vec![
3548 (2i8, Arc::new(ArrowField::new("a", DataType::Int32, false))),
3549 (7i8, Arc::new(ArrowField::new("b", DataType::Utf8, false))),
3550 ]
3551 .into_iter()
3552 .collect();
3553 let union_dt = DataType::Union(uf, UnionMode::Dense);
3554 let arrow_schema = single_field_schema(ArrowField::new("u", union_dt, true));
3555 let with_extras = AvroSchema::from_arrow_with_options(
3556 &arrow_schema,
3557 Some(AvroSchemaOptions {
3558 null_order: Some(Nullability::NullFirst),
3559 strip_metadata: false,
3560 }),
3561 )
3562 .unwrap();
3563 let v_with: Value = serde_json::from_str(&with_extras.json_string).unwrap();
3564 let union_arr = v_with["fields"][0]["type"].as_array().expect("union array");
3565 let first_obj = union_arr
3566 .iter()
3567 .find(|b| b.is_object())
3568 .expect("expected an object branch with extras");
3569 let obj = first_obj.as_object().unwrap();
3570 assert_eq!(obj.get("type").and_then(|t| t.as_str()), Some("int"));
3571 assert_eq!(
3572 obj.get("arrowUnionMode").and_then(|m| m.as_str()),
3573 Some("dense")
3574 );
3575 let type_ids: Vec<i64> = obj["arrowUnionTypeIds"]
3576 .as_array()
3577 .expect("arrowUnionTypeIds array")
3578 .iter()
3579 .map(|n| n.as_i64().expect("i64"))
3580 .collect();
3581 assert_eq!(type_ids, vec![2, 7]);
3582 let stripped = AvroSchema::from_arrow_with_options(
3583 &arrow_schema,
3584 Some(AvroSchemaOptions {
3585 null_order: Some(Nullability::NullFirst),
3586 strip_metadata: true,
3587 }),
3588 )
3589 .unwrap();
3590 let v_stripped: Value = serde_json::from_str(&stripped.json_string).unwrap();
3591 let union_arr2 = v_stripped["fields"][0]["type"]
3592 .as_array()
3593 .expect("union array");
3594 assert!(
3595 !union_arr2.iter().any(|b| b
3596 .as_object()
3597 .is_some_and(|m| m.contains_key("arrowUnionMode"))),
3598 "extras must be removed when strip_metadata=true"
3599 );
3600 assert_eq!(union_arr2[0], Value::String("null".into()));
3601 assert_eq!(union_arr2[1], Value::String("int".into()));
3602 assert_eq!(union_arr2[2], Value::String("string".into()));
3603 }
3604
3605 #[test]
3606 fn test_project_empty_projection() {
3607 let schema_json = r#"{
3608 "type": "record",
3609 "name": "Test",
3610 "fields": [
3611 {"name": "a", "type": "int"},
3612 {"name": "b", "type": "string"}
3613 ]
3614 }"#;
3615 let schema = AvroSchema::new(schema_json.to_string());
3616 let projected = schema.project(&[]).unwrap();
3617 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3618 let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3619 assert!(
3620 fields.is_empty(),
3621 "Empty projection should yield empty fields"
3622 );
3623 }
3624
3625 #[test]
3626 fn test_project_single_field() {
3627 let schema_json = r#"{
3628 "type": "record",
3629 "name": "Test",
3630 "fields": [
3631 {"name": "a", "type": "int"},
3632 {"name": "b", "type": "string"},
3633 {"name": "c", "type": "long"}
3634 ]
3635 }"#;
3636 let schema = AvroSchema::new(schema_json.to_string());
3637 let projected = schema.project(&[1]).unwrap();
3638 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3639 let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3640 assert_eq!(fields.len(), 1);
3641 assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("b"));
3642 }
3643
3644 #[test]
3645 fn test_project_multiple_fields() {
3646 let schema_json = r#"{
3647 "type": "record",
3648 "name": "Test",
3649 "fields": [
3650 {"name": "a", "type": "int"},
3651 {"name": "b", "type": "string"},
3652 {"name": "c", "type": "long"},
3653 {"name": "d", "type": "boolean"}
3654 ]
3655 }"#;
3656 let schema = AvroSchema::new(schema_json.to_string());
3657 let projected = schema.project(&[0, 2, 3]).unwrap();
3658 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3659 let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3660 assert_eq!(fields.len(), 3);
3661 assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("a"));
3662 assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("c"));
3663 assert_eq!(fields[2].get("name").and_then(|n| n.as_str()), Some("d"));
3664 }
3665
3666 #[test]
3667 fn test_project_all_fields() {
3668 let schema_json = r#"{
3669 "type": "record",
3670 "name": "Test",
3671 "fields": [
3672 {"name": "a", "type": "int"},
3673 {"name": "b", "type": "string"}
3674 ]
3675 }"#;
3676 let schema = AvroSchema::new(schema_json.to_string());
3677 let projected = schema.project(&[0, 1]).unwrap();
3678 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3679 let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3680 assert_eq!(fields.len(), 2);
3681 assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("a"));
3682 assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("b"));
3683 }
3684
3685 #[test]
3686 fn test_project_reorder_fields() {
3687 let schema_json = r#"{
3688 "type": "record",
3689 "name": "Test",
3690 "fields": [
3691 {"name": "a", "type": "int"},
3692 {"name": "b", "type": "string"},
3693 {"name": "c", "type": "long"}
3694 ]
3695 }"#;
3696 let schema = AvroSchema::new(schema_json.to_string());
3697 let projected = schema.project(&[2, 0, 1]).unwrap();
3699 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3700 let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3701 assert_eq!(fields.len(), 3);
3702 assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("c"));
3703 assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("a"));
3704 assert_eq!(fields[2].get("name").and_then(|n| n.as_str()), Some("b"));
3705 }
3706
3707 #[test]
3708 fn test_project_preserves_record_metadata() {
3709 let schema_json = r#"{
3710 "type": "record",
3711 "name": "MyRecord",
3712 "namespace": "com.example",
3713 "doc": "A test record",
3714 "aliases": ["OldRecord"],
3715 "fields": [
3716 {"name": "a", "type": "int"},
3717 {"name": "b", "type": "string"}
3718 ]
3719 }"#;
3720 let schema = AvroSchema::new(schema_json.to_string());
3721 let projected = schema.project(&[0]).unwrap();
3722 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3723 assert_eq!(v.get("name").and_then(|n| n.as_str()), Some("MyRecord"));
3724 assert_eq!(
3725 v.get("namespace").and_then(|n| n.as_str()),
3726 Some("com.example")
3727 );
3728 assert_eq!(v.get("doc").and_then(|n| n.as_str()), Some("A test record"));
3729 assert!(v.get("aliases").is_some());
3730 }
3731
3732 #[test]
3733 fn test_project_preserves_field_metadata() {
3734 let schema_json = r#"{
3735 "type": "record",
3736 "name": "Test",
3737 "fields": [
3738 {"name": "a", "type": "int", "doc": "Field A", "default": 0},
3739 {"name": "b", "type": "string"}
3740 ]
3741 }"#;
3742 let schema = AvroSchema::new(schema_json.to_string());
3743 let projected = schema.project(&[0]).unwrap();
3744 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3745 let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3746 assert_eq!(
3747 fields[0].get("doc").and_then(|d| d.as_str()),
3748 Some("Field A")
3749 );
3750 assert_eq!(fields[0].get("default").and_then(|d| d.as_i64()), Some(0));
3751 }
3752
3753 #[test]
3754 fn test_project_with_nested_record() {
3755 let schema_json = r#"{
3756 "type": "record",
3757 "name": "Outer",
3758 "fields": [
3759 {"name": "id", "type": "int"},
3760 {"name": "inner", "type": {
3761 "type": "record",
3762 "name": "Inner",
3763 "fields": [
3764 {"name": "x", "type": "int"},
3765 {"name": "y", "type": "string"}
3766 ]
3767 }},
3768 {"name": "value", "type": "double"}
3769 ]
3770 }"#;
3771 let schema = AvroSchema::new(schema_json.to_string());
3772 let projected = schema.project(&[1]).unwrap();
3773 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3774 let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3775 assert_eq!(fields.len(), 1);
3776 assert_eq!(
3777 fields[0].get("name").and_then(|n| n.as_str()),
3778 Some("inner")
3779 );
3780 let inner_type = fields[0].get("type").unwrap();
3782 assert_eq!(
3783 inner_type.get("type").and_then(|t| t.as_str()),
3784 Some("record")
3785 );
3786 assert_eq!(
3787 inner_type.get("name").and_then(|n| n.as_str()),
3788 Some("Inner")
3789 );
3790 }
3791
3792 #[test]
3793 fn test_project_with_complex_field_types() {
3794 let schema_json = r#"{
3795 "type": "record",
3796 "name": "Test",
3797 "fields": [
3798 {"name": "arr", "type": {"type": "array", "items": "int"}},
3799 {"name": "map", "type": {"type": "map", "values": "string"}},
3800 {"name": "union", "type": ["null", "int"]}
3801 ]
3802 }"#;
3803 let schema = AvroSchema::new(schema_json.to_string());
3804 let projected = schema.project(&[0, 2]).unwrap();
3805 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3806 let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3807 assert_eq!(fields.len(), 2);
3808 let arr_type = fields[0].get("type").unwrap();
3810 assert_eq!(arr_type.get("type").and_then(|t| t.as_str()), Some("array"));
3811 let union_type = fields[1].get("type").unwrap();
3813 assert!(union_type.is_array());
3814 }
3815
3816 #[test]
3817 fn test_project_error_invalid_json() {
3818 let schema = AvroSchema::new("not valid json".to_string());
3819 let err = schema.project(&[0]).unwrap_err();
3820 let msg = err.to_string();
3821 assert!(
3822 msg.contains("Invalid Avro schema JSON"),
3823 "Expected parse error, got: {msg}"
3824 );
3825 }
3826
3827 #[test]
3828 fn test_project_error_not_object() {
3829 let schema = AvroSchema::new(r#""string""#.to_string());
3831 let err = schema.project(&[0]).unwrap_err();
3832 let msg = err.to_string();
3833 assert!(
3834 msg.contains("must be a JSON object"),
3835 "Expected object error, got: {msg}"
3836 );
3837 }
3838
3839 #[test]
3840 fn test_project_error_array_schema() {
3841 let schema = AvroSchema::new(r#"["null", "int"]"#.to_string());
3843 let err = schema.project(&[0]).unwrap_err();
3844 let msg = err.to_string();
3845 assert!(
3846 msg.contains("must be a JSON object"),
3847 "Expected object error for array schema, got: {msg}"
3848 );
3849 }
3850
3851 #[test]
3852 fn test_project_error_type_not_record() {
3853 let schema_json = r#"{
3854 "type": "enum",
3855 "name": "Color",
3856 "symbols": ["RED", "GREEN", "BLUE"]
3857 }"#;
3858 let schema = AvroSchema::new(schema_json.to_string());
3859 let err = schema.project(&[0]).unwrap_err();
3860 let msg = err.to_string();
3861 assert!(
3862 msg.contains("must be an Avro record") && msg.contains("'enum'"),
3863 "Expected type mismatch error, got: {msg}"
3864 );
3865 }
3866
3867 #[test]
3868 fn test_project_error_type_array() {
3869 let schema_json = r#"{
3870 "type": "array",
3871 "items": "int"
3872 }"#;
3873 let schema = AvroSchema::new(schema_json.to_string());
3874 let err = schema.project(&[0]).unwrap_err();
3875 let msg = err.to_string();
3876 assert!(
3877 msg.contains("must be an Avro record") && msg.contains("'array'"),
3878 "Expected type mismatch error for array type, got: {msg}"
3879 );
3880 }
3881
3882 #[test]
3883 fn test_project_error_type_fixed() {
3884 let schema_json = r#"{
3885 "type": "fixed",
3886 "name": "MD5",
3887 "size": 16
3888 }"#;
3889 let schema = AvroSchema::new(schema_json.to_string());
3890 let err = schema.project(&[0]).unwrap_err();
3891 let msg = err.to_string();
3892 assert!(
3893 msg.contains("must be an Avro record") && msg.contains("'fixed'"),
3894 "Expected type mismatch error for fixed type, got: {msg}"
3895 );
3896 }
3897
3898 #[test]
3899 fn test_project_error_type_map() {
3900 let schema_json = r#"{
3901 "type": "map",
3902 "values": "string"
3903 }"#;
3904 let schema = AvroSchema::new(schema_json.to_string());
3905 let err = schema.project(&[0]).unwrap_err();
3906 let msg = err.to_string();
3907 assert!(
3908 msg.contains("must be an Avro record") && msg.contains("'map'"),
3909 "Expected type mismatch error for map type, got: {msg}"
3910 );
3911 }
3912
3913 #[test]
3914 fn test_project_error_missing_type_field() {
3915 let schema_json = r#"{
3916 "name": "Test",
3917 "fields": [{"name": "a", "type": "int"}]
3918 }"#;
3919 let schema = AvroSchema::new(schema_json.to_string());
3920 let err = schema.project(&[0]).unwrap_err();
3921 let msg = err.to_string();
3922 assert!(
3923 msg.contains("missing required 'type' field"),
3924 "Expected missing type error, got: {msg}"
3925 );
3926 }
3927
3928 #[test]
3929 fn test_project_error_missing_fields() {
3930 let schema_json = r#"{
3931 "type": "record",
3932 "name": "Test"
3933 }"#;
3934 let schema = AvroSchema::new(schema_json.to_string());
3935 let err = schema.project(&[0]).unwrap_err();
3936 let msg = err.to_string();
3937 assert!(
3938 msg.contains("missing required 'fields'"),
3939 "Expected missing fields error, got: {msg}"
3940 );
3941 }
3942
3943 #[test]
3944 fn test_project_error_fields_not_array() {
3945 let schema_json = r#"{
3946 "type": "record",
3947 "name": "Test",
3948 "fields": "not an array"
3949 }"#;
3950 let schema = AvroSchema::new(schema_json.to_string());
3951 let err = schema.project(&[0]).unwrap_err();
3952 let msg = err.to_string();
3953 assert!(
3954 msg.contains("'fields' must be an array"),
3955 "Expected fields array error, got: {msg}"
3956 );
3957 }
3958
3959 #[test]
3960 fn test_project_error_index_out_of_bounds() {
3961 let schema_json = r#"{
3962 "type": "record",
3963 "name": "Test",
3964 "fields": [
3965 {"name": "a", "type": "int"},
3966 {"name": "b", "type": "string"}
3967 ]
3968 }"#;
3969 let schema = AvroSchema::new(schema_json.to_string());
3970 let err = schema.project(&[5]).unwrap_err();
3971 let msg = err.to_string();
3972 assert!(
3973 msg.contains("out of bounds") && msg.contains("5") && msg.contains("2"),
3974 "Expected out of bounds error, got: {msg}"
3975 );
3976 }
3977
3978 #[test]
3979 fn test_project_error_index_out_of_bounds_edge() {
3980 let schema_json = r#"{
3981 "type": "record",
3982 "name": "Test",
3983 "fields": [
3984 {"name": "a", "type": "int"}
3985 ]
3986 }"#;
3987 let schema = AvroSchema::new(schema_json.to_string());
3988 let err = schema.project(&[1]).unwrap_err();
3990 let msg = err.to_string();
3991 assert!(
3992 msg.contains("out of bounds") && msg.contains("1"),
3993 "Expected out of bounds error for edge case, got: {msg}"
3994 );
3995 }
3996
3997 #[test]
3998 fn test_project_error_duplicate_index() {
3999 let schema_json = r#"{
4000 "type": "record",
4001 "name": "Test",
4002 "fields": [
4003 {"name": "a", "type": "int"},
4004 {"name": "b", "type": "string"},
4005 {"name": "c", "type": "long"}
4006 ]
4007 }"#;
4008 let schema = AvroSchema::new(schema_json.to_string());
4009 let err = schema.project(&[0, 1, 0]).unwrap_err();
4010 let msg = err.to_string();
4011 assert!(
4012 msg.contains("Duplicate projection index") && msg.contains("0"),
4013 "Expected duplicate index error, got: {msg}"
4014 );
4015 }
4016
4017 #[test]
4018 fn test_project_error_duplicate_index_consecutive() {
4019 let schema_json = r#"{
4020 "type": "record",
4021 "name": "Test",
4022 "fields": [
4023 {"name": "a", "type": "int"},
4024 {"name": "b", "type": "string"}
4025 ]
4026 }"#;
4027 let schema = AvroSchema::new(schema_json.to_string());
4028 let err = schema.project(&[1, 1]).unwrap_err();
4029 let msg = err.to_string();
4030 assert!(
4031 msg.contains("Duplicate projection index") && msg.contains("1"),
4032 "Expected duplicate index error for consecutive duplicates, got: {msg}"
4033 );
4034 }
4035
4036 #[test]
4037 fn test_project_with_empty_fields() {
4038 let schema_json = r#"{
4039 "type": "record",
4040 "name": "EmptyRecord",
4041 "fields": []
4042 }"#;
4043 let schema = AvroSchema::new(schema_json.to_string());
4044 let projected = schema.project(&[]).unwrap();
4046 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
4047 let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
4048 assert!(fields.is_empty());
4049 }
4050
4051 #[test]
4052 fn test_project_empty_fields_index_out_of_bounds() {
4053 let schema_json = r#"{
4054 "type": "record",
4055 "name": "EmptyRecord",
4056 "fields": []
4057 }"#;
4058 let schema = AvroSchema::new(schema_json.to_string());
4059 let err = schema.project(&[0]).unwrap_err();
4060 let msg = err.to_string();
4061 assert!(
4062 msg.contains("out of bounds") && msg.contains("0 fields"),
4063 "Expected out of bounds error for empty record, got: {msg}"
4064 );
4065 }
4066
4067 #[test]
4068 fn test_project_result_is_valid_avro_schema() {
4069 let schema_json = r#"{
4070 "type": "record",
4071 "name": "Test",
4072 "namespace": "com.example",
4073 "fields": [
4074 {"name": "id", "type": "long"},
4075 {"name": "name", "type": "string"},
4076 {"name": "active", "type": "boolean"}
4077 ]
4078 }"#;
4079 let schema = AvroSchema::new(schema_json.to_string());
4080 let projected = schema.project(&[0, 2]).unwrap();
4081 let parsed = projected.schema();
4083 assert!(parsed.is_ok(), "Projected schema should be valid Avro");
4084 match parsed.unwrap() {
4085 Schema::Complex(ComplexType::Record(r)) => {
4086 assert_eq!(r.name, "Test");
4087 assert_eq!(r.namespace, Some("com.example"));
4088 assert_eq!(r.fields.len(), 2);
4089 assert_eq!(r.fields[0].name, "id");
4090 assert_eq!(r.fields[1].name, "active");
4091 }
4092 _ => panic!("Expected Record schema"),
4093 }
4094 }
4095
4096 #[test]
4097 fn test_project_non_contiguous_indices() {
4098 let schema_json = r#"{
4099 "type": "record",
4100 "name": "Test",
4101 "fields": [
4102 {"name": "f0", "type": "int"},
4103 {"name": "f1", "type": "int"},
4104 {"name": "f2", "type": "int"},
4105 {"name": "f3", "type": "int"},
4106 {"name": "f4", "type": "int"}
4107 ]
4108 }"#;
4109 let schema = AvroSchema::new(schema_json.to_string());
4110 let projected = schema.project(&[0, 2, 4]).unwrap();
4112 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
4113 let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
4114 assert_eq!(fields.len(), 3);
4115 assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("f0"));
4116 assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("f2"));
4117 assert_eq!(fields[2].get("name").and_then(|n| n.as_str()), Some("f4"));
4118 }
4119
4120 #[test]
4121 fn test_project_single_field_from_many() {
4122 let schema_json = r#"{
4123 "type": "record",
4124 "name": "BigRecord",
4125 "fields": [
4126 {"name": "f0", "type": "int"},
4127 {"name": "f1", "type": "int"},
4128 {"name": "f2", "type": "int"},
4129 {"name": "f3", "type": "int"},
4130 {"name": "f4", "type": "int"},
4131 {"name": "f5", "type": "int"},
4132 {"name": "f6", "type": "int"},
4133 {"name": "f7", "type": "int"},
4134 {"name": "f8", "type": "int"},
4135 {"name": "f9", "type": "int"}
4136 ]
4137 }"#;
4138 let schema = AvroSchema::new(schema_json.to_string());
4139 let projected = schema.project(&[9]).unwrap();
4141 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
4142 let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
4143 assert_eq!(fields.len(), 1);
4144 assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("f9"));
4145 }
4146}