Skip to main content

arrow_avro/
schema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Avro Schema representations for Arrow.
19
20#[cfg(feature = "canonical_extension_types")]
21use arrow_schema::extension::ExtensionType;
22use arrow_schema::{
23    ArrowError, DataType, Field as ArrowField, IntervalUnit, Schema as ArrowSchema, TimeUnit,
24    UnionMode,
25};
26use serde::{Deserialize, Serialize};
27use serde_json::{Map as JsonMap, Value, json};
28#[cfg(feature = "sha256")]
29use sha2::{Digest, Sha256};
30use std::borrow::Cow;
31use std::cmp::PartialEq;
32use std::collections::hash_map::Entry;
33use std::collections::{HashMap, HashSet};
34use strum_macros::AsRefStr;
35
36/// The Avro single‑object encoding “magic” bytes (`0xC3 0x01`)
37pub const SINGLE_OBJECT_MAGIC: [u8; 2] = [0xC3, 0x01];
38
39/// The Confluent "magic" byte (`0x00`)
40pub const CONFLUENT_MAGIC: [u8; 1] = [0x00];
41
42/// The maximum possible length of a prefix.
43/// SHA256 (32) + single-object magic (2)
44pub const MAX_PREFIX_LEN: usize = 34;
45
46/// The metadata key used for storing the JSON encoded `Schema`
47pub const SCHEMA_METADATA_KEY: &str = "avro.schema";
48
49/// Metadata key used to represent Avro enum symbols in an Arrow schema.
50pub const AVRO_ENUM_SYMBOLS_METADATA_KEY: &str = "avro.enum.symbols";
51
52/// Metadata key used to store the default value of a field in an Avro schema.
53pub const AVRO_FIELD_DEFAULT_METADATA_KEY: &str = "avro.field.default";
54
55/// Metadata key used to store the name of a type in an Avro schema.
56pub const AVRO_NAME_METADATA_KEY: &str = "avro.name";
57
58/// Metadata key used to store the name of a type in an Avro schema.
59pub const AVRO_NAMESPACE_METADATA_KEY: &str = "avro.namespace";
60
61/// Metadata key used to store the documentation for a type in an Avro schema.
62pub const AVRO_DOC_METADATA_KEY: &str = "avro.doc";
63
64/// Default name for the root record in an Avro schema.
65pub const AVRO_ROOT_RECORD_DEFAULT_NAME: &str = "topLevelRecord";
66
67/// Avro types are not nullable, with nullability instead encoded as a union
68/// where one of the variants is the null type.
69///
70/// To accommodate this, we specially case two-variant unions where one of the
71/// variants is the null type, and use this to derive arrow's notion of nullability
72#[derive(Debug, Copy, Clone, PartialEq, Default)]
73pub(crate) enum Nullability {
74    /// The nulls are encoded as the first union variant
75    #[default]
76    NullFirst,
77    /// The nulls are encoded as the second union variant
78    NullSecond,
79}
80
81/// Either a [`PrimitiveType`] or a reference to a previously defined named type
82///
83/// <https://avro.apache.org/docs/1.11.1/specification/#names>
84#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
85#[serde(untagged)]
86/// A type name in an Avro schema
87///
88/// This represents the different ways a type can be referenced in an Avro schema.
89pub(crate) enum TypeName<'a> {
90    /// A primitive type like null, boolean, int, etc.
91    Primitive(PrimitiveType),
92    /// A reference to another named type
93    Ref(&'a str),
94}
95
96/// A primitive type
97///
98/// <https://avro.apache.org/docs/1.11.1/specification/#primitive-types>
99#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, AsRefStr)]
100#[serde(rename_all = "camelCase")]
101#[strum(serialize_all = "lowercase")]
102pub(crate) enum PrimitiveType {
103    /// null: no value
104    Null,
105    /// boolean: a binary value
106    Boolean,
107    /// int: 32-bit signed integer
108    Int,
109    /// long: 64-bit signed integer
110    Long,
111    /// float: single precision (32-bit) IEEE 754 floating-point number
112    Float,
113    /// double: double precision (64-bit) IEEE 754 floating-point number
114    Double,
115    /// bytes: sequence of 8-bit unsigned bytes
116    Bytes,
117    /// string: Unicode character sequence
118    String,
119}
120
121/// Additional attributes within a `Schema`
122///
123/// <https://avro.apache.org/docs/1.11.1/specification/#schema-declaration>
124#[derive(Debug, Clone, PartialEq, Eq, Default, Deserialize, Serialize)]
125#[serde(rename_all = "camelCase")]
126pub(crate) struct Attributes<'a> {
127    /// A logical type name
128    ///
129    /// <https://avro.apache.org/docs/1.11.1/specification/#logical-types>
130    #[serde(default)]
131    pub(crate) logical_type: Option<&'a str>,
132
133    /// Additional JSON attributes
134    #[serde(flatten)]
135    pub(crate) additional: HashMap<&'a str, Value>,
136}
137
138impl Attributes<'_> {
139    /// Returns the field metadata for this [`Attributes`]
140    pub(crate) fn field_metadata(&self) -> HashMap<String, String> {
141        self.additional
142            .iter()
143            .map(|(k, v)| (k.to_string(), v.to_string()))
144            .collect()
145    }
146}
147
148/// A type definition that is not a variant of [`ComplexType`]
149#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
150#[serde(rename_all = "camelCase")]
151pub(crate) struct Type<'a> {
152    /// The type of this Avro data structure
153    #[serde(borrow)]
154    pub(crate) r#type: TypeName<'a>,
155    /// Additional attributes associated with this type
156    #[serde(flatten)]
157    pub(crate) attributes: Attributes<'a>,
158}
159
160/// An Avro schema
161///
162/// This represents the different shapes of Avro schemas as defined in the specification.
163/// See <https://avro.apache.org/docs/1.11.1/specification/#schemas> for more details.
164#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
165#[serde(untagged)]
166pub(crate) enum Schema<'a> {
167    /// A direct type name (primitive or reference)
168    #[serde(borrow)]
169    TypeName(TypeName<'a>),
170    /// A union of multiple schemas (e.g., ["null", "string"])
171    #[serde(borrow)]
172    Union(Vec<Schema<'a>>),
173    /// A complex type such as record, array, map, etc.
174    #[serde(borrow)]
175    Complex(ComplexType<'a>),
176    /// A type with attributes
177    #[serde(borrow)]
178    Type(Type<'a>),
179}
180
181/// A complex type
182///
183/// <https://avro.apache.org/docs/1.11.1/specification/#complex-types>
184#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
185#[serde(tag = "type", rename_all = "camelCase")]
186pub(crate) enum ComplexType<'a> {
187    /// Record type: a sequence of fields with names and types
188    #[serde(borrow)]
189    Record(Record<'a>),
190    /// Enum type: a set of named values
191    #[serde(borrow)]
192    Enum(Enum<'a>),
193    /// Array type: a sequence of values of the same type
194    #[serde(borrow)]
195    Array(Array<'a>),
196    /// Map type: a mapping from strings to values of the same type
197    #[serde(borrow)]
198    Map(Map<'a>),
199    /// Fixed type: a fixed-size byte array
200    #[serde(borrow)]
201    Fixed(Fixed<'a>),
202}
203
204/// A record
205///
206/// <https://avro.apache.org/docs/1.11.1/specification/#schema-record>
207#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
208pub(crate) struct Record<'a> {
209    /// Name of the record
210    #[serde(borrow)]
211    pub(crate) name: &'a str,
212    /// Optional namespace for the record, provides a way to organize names
213    #[serde(borrow, default)]
214    pub(crate) namespace: Option<&'a str>,
215    /// Optional documentation string for the record
216    #[serde(borrow, default)]
217    pub(crate) doc: Option<Cow<'a, str>>,
218    /// Alternative names for this record
219    #[serde(borrow, default)]
220    pub(crate) aliases: Vec<&'a str>,
221    /// The fields contained in this record
222    #[serde(borrow)]
223    pub(crate) fields: Vec<Field<'a>>,
224    /// Additional attributes for this record
225    #[serde(flatten)]
226    pub(crate) attributes: Attributes<'a>,
227}
228
229fn deserialize_default<'de, D>(deserializer: D) -> Result<Option<Value>, D::Error>
230where
231    D: serde::Deserializer<'de>,
232{
233    Value::deserialize(deserializer).map(Some)
234}
235
236/// A field within a [`Record`]
237#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
238pub(crate) struct Field<'a> {
239    /// Name of the field within the record
240    #[serde(borrow)]
241    pub(crate) name: &'a str,
242    /// Optional documentation for this field
243    #[serde(borrow, default)]
244    pub(crate) doc: Option<Cow<'a, str>>,
245    /// The field's type definition
246    #[serde(borrow)]
247    pub(crate) r#type: Schema<'a>,
248    /// Optional default value for this field
249    #[serde(deserialize_with = "deserialize_default", default)]
250    pub(crate) default: Option<Value>,
251    /// Alternative names (aliases) for this field (Avro spec: field-level aliases).
252    /// Borrowed from input JSON where possible.
253    #[serde(borrow, default)]
254    pub(crate) aliases: Vec<&'a str>,
255}
256
257/// An enumeration
258///
259/// <https://avro.apache.org/docs/1.11.1/specification/#enums>
260#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
261pub(crate) struct Enum<'a> {
262    /// Name of the enum
263    #[serde(borrow)]
264    pub(crate) name: &'a str,
265    /// Optional namespace for the enum, provides organizational structure
266    #[serde(borrow, default)]
267    pub(crate) namespace: Option<&'a str>,
268    /// Optional documentation string describing the enum
269    #[serde(borrow, default)]
270    pub(crate) doc: Option<Cow<'a, str>>,
271    /// Alternative names for this enum
272    #[serde(borrow, default)]
273    pub(crate) aliases: Vec<&'a str>,
274    /// The symbols (values) that this enum can have
275    #[serde(borrow)]
276    pub(crate) symbols: Vec<&'a str>,
277    /// Optional default value for this enum
278    #[serde(borrow, default)]
279    pub(crate) default: Option<&'a str>,
280    /// Additional attributes for this enum
281    #[serde(flatten)]
282    pub(crate) attributes: Attributes<'a>,
283}
284
285/// An array
286///
287/// <https://avro.apache.org/docs/1.11.1/specification/#arrays>
288#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
289pub(crate) struct Array<'a> {
290    /// The schema for items in this array
291    #[serde(borrow)]
292    pub(crate) items: Box<Schema<'a>>,
293    /// Additional attributes for this array
294    #[serde(flatten)]
295    pub(crate) attributes: Attributes<'a>,
296}
297
298/// A map
299///
300/// <https://avro.apache.org/docs/1.11.1/specification/#maps>
301#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
302pub(crate) struct Map<'a> {
303    /// The schema for values in this map
304    #[serde(borrow)]
305    pub(crate) values: Box<Schema<'a>>,
306    /// Additional attributes for this map
307    #[serde(flatten)]
308    pub(crate) attributes: Attributes<'a>,
309}
310
311/// A fixed length binary array
312///
313/// <https://avro.apache.org/docs/1.11.1/specification/#fixed>
314#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
315pub(crate) struct Fixed<'a> {
316    /// Name of the fixed type
317    #[serde(borrow)]
318    pub(crate) name: &'a str,
319    /// Optional namespace for the fixed type
320    #[serde(borrow, default)]
321    pub(crate) namespace: Option<&'a str>,
322    /// Alternative names for this fixed type
323    #[serde(borrow, default)]
324    pub(crate) aliases: Vec<&'a str>,
325    /// The number of bytes in this fixed type
326    pub(crate) size: usize,
327    /// Additional attributes for this fixed type
328    #[serde(flatten)]
329    pub(crate) attributes: Attributes<'a>,
330}
331
332#[derive(Debug, Copy, Clone, PartialEq, Default)]
333pub(crate) struct AvroSchemaOptions {
334    pub(crate) null_order: Option<Nullability>,
335    pub(crate) strip_metadata: bool,
336}
337
338/// A wrapper for an Avro schema in its JSON string representation.
339#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
340pub struct AvroSchema {
341    /// The Avro schema as a JSON string.
342    pub json_string: String,
343}
344
345impl TryFrom<&ArrowSchema> for AvroSchema {
346    type Error = ArrowError;
347
348    /// Converts an `ArrowSchema` to `AvroSchema`, delegating to
349    /// `AvroSchema::from_arrow_with_options` with `None` so that the
350    /// union null ordering is decided by `Nullability::default()`.
351    fn try_from(schema: &ArrowSchema) -> Result<Self, Self::Error> {
352        AvroSchema::from_arrow_with_options(schema, None)
353    }
354}
355
356impl AvroSchema {
357    /// Creates a new `AvroSchema` from a JSON string.
358    pub fn new(json_string: String) -> Self {
359        Self { json_string }
360    }
361
362    pub(crate) fn schema(&self) -> Result<Schema<'_>, ArrowError> {
363        serde_json::from_str(self.json_string.as_str())
364            .map_err(|e| ArrowError::ParseError(format!("Invalid Avro schema JSON: {e}")))
365    }
366
367    /// Returns the fingerprint of the schema, computed using the specified [`FingerprintAlgorithm`].
368    ///
369    /// The fingerprint is computed over the schema's Parsed Canonical Form
370    /// as defined by the Avro specification. Depending on `hash_type`, this
371    /// will return one of the supported [`Fingerprint`] variants:
372    /// - [`Fingerprint::Rabin`] for [`FingerprintAlgorithm::Rabin`]
373    /// - `Fingerprint::MD5` for `FingerprintAlgorithm::MD5`
374    /// - `Fingerprint::SHA256` for `FingerprintAlgorithm::SHA256`
375    ///
376    /// Note: [`FingerprintAlgorithm::Id`] or [`FingerprintAlgorithm::Id64`] cannot be used to generate a fingerprint
377    /// and will result in an error. If you intend to use a Schema Registry ID-based
378    /// wire format, either use [`SchemaStore::set`] or load the [`Fingerprint::Id`] directly via [`Fingerprint::load_fingerprint_id`] or for
379    /// [`Fingerprint::Id64`] via [`Fingerprint::load_fingerprint_id64`].
380    ///
381    /// See also: <https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints>
382    ///
383    /// # Errors
384    /// Returns an error if deserializing the schema fails, if generating the
385    /// canonical form of the schema fails, or if `hash_type` is [`FingerprintAlgorithm::Id`].
386    ///
387    /// # Examples
388    /// ```
389    /// use arrow_avro::schema::{AvroSchema, FingerprintAlgorithm};
390    ///
391    /// let avro = AvroSchema::new("\"string\"".to_string());
392    /// let fp = avro.fingerprint(FingerprintAlgorithm::Rabin).unwrap();
393    /// ```
394    pub fn fingerprint(&self, hash_type: FingerprintAlgorithm) -> Result<Fingerprint, ArrowError> {
395        Self::generate_fingerprint(&self.schema()?, hash_type)
396    }
397
398    pub(crate) fn project(&self, projection: &[usize]) -> Result<Self, ArrowError> {
399        let mut value: Value = serde_json::from_str(&self.json_string)
400            .map_err(|e| ArrowError::AvroError(format!("Invalid Avro schema JSON: {e}")))?;
401        let obj = value.as_object_mut().ok_or_else(|| {
402            ArrowError::AvroError(
403                "Projected schema must be a JSON object Avro record schema".to_string(),
404            )
405        })?;
406        match obj.get("type").and_then(|v| v.as_str()) {
407            Some("record") => {}
408            Some(other) => {
409                return Err(ArrowError::AvroError(format!(
410                    "Projected schema must be an Avro record, found type '{other}'"
411                )));
412            }
413            None => {
414                return Err(ArrowError::AvroError(
415                    "Projected schema missing required 'type' field".to_string(),
416                ));
417            }
418        }
419        let fields_val = obj.get_mut("fields").ok_or_else(|| {
420            ArrowError::AvroError("Avro record schema missing required 'fields'".to_string())
421        })?;
422        let projected_fields = {
423            let mut original_fields = match fields_val {
424                Value::Array(arr) => std::mem::take(arr),
425                _ => {
426                    return Err(ArrowError::AvroError(
427                        "Avro record schema 'fields' must be an array".to_string(),
428                    ));
429                }
430            };
431            let len = original_fields.len();
432            let mut seen: HashSet<usize> = HashSet::with_capacity(projection.len());
433            let mut out: Vec<Value> = Vec::with_capacity(projection.len());
434            for &i in projection {
435                if i >= len {
436                    return Err(ArrowError::AvroError(format!(
437                        "Projection index {i} out of bounds for record with {len} fields"
438                    )));
439                }
440                if !seen.insert(i) {
441                    return Err(ArrowError::AvroError(format!(
442                        "Duplicate projection index {i}"
443                    )));
444                }
445                out.push(std::mem::replace(&mut original_fields[i], Value::Null));
446            }
447            out
448        };
449        *fields_val = Value::Array(projected_fields);
450        let json_string = serde_json::to_string(&value).map_err(|e| {
451            ArrowError::AvroError(format!(
452                "Failed to serialize projected Avro schema JSON: {e}"
453            ))
454        })?;
455        Ok(Self::new(json_string))
456    }
457
458    pub(crate) fn generate_fingerprint(
459        schema: &Schema,
460        hash_type: FingerprintAlgorithm,
461    ) -> Result<Fingerprint, ArrowError> {
462        let canonical = Self::generate_canonical_form(schema).map_err(|e| {
463            ArrowError::ComputeError(format!("Failed to generate canonical form for schema: {e}"))
464        })?;
465        match hash_type {
466            FingerprintAlgorithm::Rabin => {
467                Ok(Fingerprint::Rabin(compute_fingerprint_rabin(&canonical)))
468            }
469            FingerprintAlgorithm::Id | FingerprintAlgorithm::Id64 => Err(ArrowError::SchemaError(
470                "FingerprintAlgorithm of Id or Id64 cannot be used to generate a fingerprint; \
471                if using Fingerprint::Id, pass the registry ID in instead using the set method."
472                    .to_string(),
473            )),
474            #[cfg(feature = "md5")]
475            FingerprintAlgorithm::MD5 => Ok(Fingerprint::MD5(compute_fingerprint_md5(&canonical))),
476            #[cfg(feature = "sha256")]
477            FingerprintAlgorithm::SHA256 => {
478                Ok(Fingerprint::SHA256(compute_fingerprint_sha256(&canonical)))
479            }
480        }
481    }
482
483    /// Generates the Parsed Canonical Form for the given `Schema`.
484    ///
485    /// The canonical form is a standardized JSON representation of the schema,
486    /// primarily used for generating a schema fingerprint for equality checking.
487    ///
488    /// This form strips attributes that do not affect the schema's identity,
489    /// such as `doc` fields, `aliases`, and any properties not defined in the
490    /// Avro specification.
491    ///
492    /// <https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas>
493    pub(crate) fn generate_canonical_form(schema: &Schema) -> Result<String, ArrowError> {
494        build_canonical(schema, None)
495    }
496
497    /// Build Avro JSON from an Arrow [`ArrowSchema`], applying the given null‑union order and optionally stripping internal Arrow metadata.
498    ///
499    /// If the input Arrow schema already contains Avro JSON in
500    /// [`SCHEMA_METADATA_KEY`], that JSON is returned verbatim to preserve
501    /// the exact header encoding alignment; otherwise, a new JSON is generated
502    /// honoring `null_union_order` at **all nullable sites**.
503    pub(crate) fn from_arrow_with_options(
504        schema: &ArrowSchema,
505        options: Option<AvroSchemaOptions>,
506    ) -> Result<AvroSchema, ArrowError> {
507        let opts = options.unwrap_or_default();
508        let order = opts.null_order.unwrap_or_default();
509        let strip = opts.strip_metadata;
510        if !strip {
511            if let Some(json) = schema.metadata.get(SCHEMA_METADATA_KEY) {
512                return Ok(AvroSchema::new(json.clone()));
513            }
514        }
515        let mut name_gen = NameGenerator::default();
516        let fields_json = schema
517            .fields()
518            .iter()
519            .map(|f| arrow_field_to_avro(f, &mut name_gen, order, strip))
520            .collect::<Result<Vec<_>, _>>()?;
521        let record_name = schema
522            .metadata
523            .get(AVRO_NAME_METADATA_KEY)
524            .map_or(AVRO_ROOT_RECORD_DEFAULT_NAME, |s| s.as_str());
525        let mut record = JsonMap::with_capacity(schema.metadata.len() + 4);
526        record.insert("type".into(), Value::String("record".into()));
527        record.insert(
528            "name".into(),
529            Value::String(sanitise_avro_name(record_name)),
530        );
531        if let Some(ns) = schema.metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
532            record.insert("namespace".into(), Value::String(ns.clone()));
533        }
534        if let Some(doc) = schema.metadata.get(AVRO_DOC_METADATA_KEY) {
535            record.insert("doc".into(), Value::String(doc.clone()));
536        }
537        record.insert("fields".into(), Value::Array(fields_json));
538        extend_with_passthrough_metadata(&mut record, &schema.metadata);
539        let json_string = serde_json::to_string(&Value::Object(record))
540            .map_err(|e| ArrowError::SchemaError(format!("Serializing Avro JSON failed: {e}")))?;
541        Ok(AvroSchema::new(json_string))
542    }
543}
544
545/// A stack-allocated, fixed-size buffer for the prefix.
546#[derive(Debug, Copy, Clone)]
547pub(crate) struct Prefix {
548    buf: [u8; MAX_PREFIX_LEN],
549    len: u8,
550}
551
552impl Prefix {
553    #[inline]
554    pub(crate) fn as_slice(&self) -> &[u8] {
555        &self.buf[..self.len as usize]
556    }
557}
558
559/// Defines the strategy for generating the per-record prefix for an Avro binary stream.
560#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
561pub enum FingerprintStrategy {
562    /// Use the 64-bit Rabin fingerprint (default for single-object encoding).
563    #[default]
564    Rabin,
565    /// Use a Confluent Schema Registry 32-bit ID.
566    Id(u32),
567    /// Use an Apicurio Schema Registry 64-bit ID.
568    Id64(u64),
569    #[cfg(feature = "md5")]
570    /// Use the 128-bit MD5 fingerprint.
571    MD5,
572    #[cfg(feature = "sha256")]
573    /// Use the 256-bit SHA-256 fingerprint.
574    SHA256,
575}
576
577impl From<Fingerprint> for FingerprintStrategy {
578    fn from(f: Fingerprint) -> Self {
579        Self::from(&f)
580    }
581}
582
583impl From<FingerprintAlgorithm> for FingerprintStrategy {
584    fn from(f: FingerprintAlgorithm) -> Self {
585        match f {
586            FingerprintAlgorithm::Rabin => FingerprintStrategy::Rabin,
587            FingerprintAlgorithm::Id => FingerprintStrategy::Id(0),
588            FingerprintAlgorithm::Id64 => FingerprintStrategy::Id64(0),
589            #[cfg(feature = "md5")]
590            FingerprintAlgorithm::MD5 => FingerprintStrategy::MD5,
591            #[cfg(feature = "sha256")]
592            FingerprintAlgorithm::SHA256 => FingerprintStrategy::SHA256,
593        }
594    }
595}
596
597impl From<&Fingerprint> for FingerprintStrategy {
598    fn from(f: &Fingerprint) -> Self {
599        match f {
600            Fingerprint::Rabin(_) => FingerprintStrategy::Rabin,
601            Fingerprint::Id(_) => FingerprintStrategy::Id(0),
602            Fingerprint::Id64(_) => FingerprintStrategy::Id64(0),
603            #[cfg(feature = "md5")]
604            Fingerprint::MD5(_) => FingerprintStrategy::MD5,
605            #[cfg(feature = "sha256")]
606            Fingerprint::SHA256(_) => FingerprintStrategy::SHA256,
607        }
608    }
609}
610
611/// Supported fingerprint algorithms for Avro schema identification.
612/// For use with Confluent Schema Registry IDs, set to None.
613#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)]
614pub enum FingerprintAlgorithm {
615    /// 64‑bit CRC‑64‑AVRO Rabin fingerprint.
616    #[default]
617    Rabin,
618    /// Represents a 32 bit fingerprint not based on a hash algorithm, (e.g., a 32-bit Schema Registry ID.)
619    Id,
620    /// Represents a 64 bit fingerprint not based on a hash algorithm, (e.g., a 64-bit Schema Registry ID.)
621    Id64,
622    #[cfg(feature = "md5")]
623    /// 128-bit MD5 message digest.
624    MD5,
625    #[cfg(feature = "sha256")]
626    /// 256-bit SHA-256 digest.
627    SHA256,
628}
629
630/// Allow easy extraction of the algorithm used to create a fingerprint.
631impl From<&Fingerprint> for FingerprintAlgorithm {
632    fn from(fp: &Fingerprint) -> Self {
633        match fp {
634            Fingerprint::Rabin(_) => FingerprintAlgorithm::Rabin,
635            Fingerprint::Id(_) => FingerprintAlgorithm::Id,
636            Fingerprint::Id64(_) => FingerprintAlgorithm::Id64,
637            #[cfg(feature = "md5")]
638            Fingerprint::MD5(_) => FingerprintAlgorithm::MD5,
639            #[cfg(feature = "sha256")]
640            Fingerprint::SHA256(_) => FingerprintAlgorithm::SHA256,
641        }
642    }
643}
644
645impl From<FingerprintStrategy> for FingerprintAlgorithm {
646    fn from(s: FingerprintStrategy) -> Self {
647        Self::from(&s)
648    }
649}
650
651impl From<&FingerprintStrategy> for FingerprintAlgorithm {
652    fn from(s: &FingerprintStrategy) -> Self {
653        match s {
654            FingerprintStrategy::Rabin => FingerprintAlgorithm::Rabin,
655            FingerprintStrategy::Id(_) => FingerprintAlgorithm::Id,
656            FingerprintStrategy::Id64(_) => FingerprintAlgorithm::Id64,
657            #[cfg(feature = "md5")]
658            FingerprintStrategy::MD5 => FingerprintAlgorithm::MD5,
659            #[cfg(feature = "sha256")]
660            FingerprintStrategy::SHA256 => FingerprintAlgorithm::SHA256,
661        }
662    }
663}
664
665/// A schema fingerprint in one of the supported formats.
666///
667/// This is used as the key inside `SchemaStore` `HashMap`. Each `SchemaStore`
668/// instance always stores only one variant, matching its configured
669/// `FingerprintAlgorithm`, but the enum makes the API uniform.
670///
671/// <https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints>
672/// <https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
673#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
674pub enum Fingerprint {
675    /// A 64-bit Rabin fingerprint.
676    Rabin(u64),
677    /// A 32-bit Schema Registry ID.
678    Id(u32),
679    /// A 64-bit Schema Registry ID.
680    Id64(u64),
681    #[cfg(feature = "md5")]
682    /// A 128-bit MD5 fingerprint.
683    MD5([u8; 16]),
684    #[cfg(feature = "sha256")]
685    /// A 256-bit SHA-256 fingerprint.
686    SHA256([u8; 32]),
687}
688
689impl From<FingerprintStrategy> for Fingerprint {
690    fn from(s: FingerprintStrategy) -> Self {
691        Self::from(&s)
692    }
693}
694
695impl From<&FingerprintStrategy> for Fingerprint {
696    fn from(s: &FingerprintStrategy) -> Self {
697        match s {
698            FingerprintStrategy::Rabin => Fingerprint::Rabin(0),
699            FingerprintStrategy::Id(id) => Fingerprint::Id(*id),
700            FingerprintStrategy::Id64(id) => Fingerprint::Id64(*id),
701            #[cfg(feature = "md5")]
702            FingerprintStrategy::MD5 => Fingerprint::MD5([0; 16]),
703            #[cfg(feature = "sha256")]
704            FingerprintStrategy::SHA256 => Fingerprint::SHA256([0; 32]),
705        }
706    }
707}
708
709impl From<FingerprintAlgorithm> for Fingerprint {
710    fn from(s: FingerprintAlgorithm) -> Self {
711        match s {
712            FingerprintAlgorithm::Rabin => Fingerprint::Rabin(0),
713            FingerprintAlgorithm::Id => Fingerprint::Id(0),
714            FingerprintAlgorithm::Id64 => Fingerprint::Id64(0),
715            #[cfg(feature = "md5")]
716            FingerprintAlgorithm::MD5 => Fingerprint::MD5([0; 16]),
717            #[cfg(feature = "sha256")]
718            FingerprintAlgorithm::SHA256 => Fingerprint::SHA256([0; 32]),
719        }
720    }
721}
722
723impl Fingerprint {
724    /// Loads the 32-bit Schema Registry fingerprint (Confluent Schema Registry ID).
725    ///
726    /// The provided `id` is in big-endian wire order; this converts it to host order
727    /// and returns `Fingerprint::Id`.
728    ///
729    /// # Returns
730    /// A `Fingerprint::Id` variant containing the 32-bit fingerprint.
731    pub fn load_fingerprint_id(id: u32) -> Self {
732        Fingerprint::Id(u32::from_be(id))
733    }
734
735    /// Loads the 64-bit Schema Registry fingerprint (Apicurio Schema Registry ID).
736    ///
737    /// The provided `id` is in big-endian wire order; this converts it to host order
738    /// and returns `Fingerprint::Id64`.
739    ///
740    /// # Returns
741    /// A `Fingerprint::Id64` variant containing the 64-bit fingerprint.
742    pub fn load_fingerprint_id64(id: u64) -> Self {
743        Fingerprint::Id64(u64::from_be(id))
744    }
745
746    /// Constructs a serialized prefix represented as a `Vec<u8>` based on the variant of the enum.
747    ///
748    /// This method serializes data in different formats depending on the variant of `self`:
749    /// - **`Id(id)`**: Uses the Confluent wire format, which includes a predefined magic header (`CONFLUENT_MAGIC`)
750    ///   followed by the big-endian byte representation of the `id`.
751    /// - **`Id64(id)`**: Uses the Apicurio wire format, which includes a predefined magic header (`CONFLUENT_MAGIC`)
752    ///   followed by the big-endian 8-byte representation of the `id`.
753    /// - **`Rabin(val)`**: Uses the Avro single-object specification format. This includes a different magic header
754    ///   (`SINGLE_OBJECT_MAGIC`) followed by the little-endian byte representation of the `val`.
755    /// - **`MD5(bytes)`** (optional, `md5` feature enabled): A non-standard extension that adds the
756    ///   `SINGLE_OBJECT_MAGIC` header followed by the provided `bytes`.
757    /// - **`SHA256(bytes)`** (optional, `sha256` feature enabled): Similar to the `MD5` variant, this is
758    ///   a non-standard extension that attaches the `SINGLE_OBJECT_MAGIC` header followed by the given `bytes`.
759    ///
760    /// # Returns
761    ///
762    /// A `Prefix` containing the serialized prefix data.
763    ///
764    /// # Features
765    ///
766    /// - You can optionally enable the `md5` feature to include the `MD5` variant.
767    /// - You can optionally enable the `sha256` feature to include the `SHA256` variant.
768    ///
769    pub(crate) fn make_prefix(&self) -> Prefix {
770        let mut buf = [0u8; MAX_PREFIX_LEN];
771        let len = match self {
772            Self::Id(val) => write_prefix(&mut buf, &CONFLUENT_MAGIC, &val.to_be_bytes()),
773            Self::Id64(val) => write_prefix(&mut buf, &CONFLUENT_MAGIC, &val.to_be_bytes()),
774            Self::Rabin(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, &val.to_le_bytes()),
775            #[cfg(feature = "md5")]
776            Self::MD5(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, val),
777            #[cfg(feature = "sha256")]
778            Self::SHA256(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, val),
779        };
780        Prefix { buf, len }
781    }
782}
783
784fn write_prefix<const MAGIC_LEN: usize, const PAYLOAD_LEN: usize>(
785    buf: &mut [u8; MAX_PREFIX_LEN],
786    magic: &[u8; MAGIC_LEN],
787    payload: &[u8; PAYLOAD_LEN],
788) -> u8 {
789    debug_assert!(MAGIC_LEN + PAYLOAD_LEN <= MAX_PREFIX_LEN);
790    let total = MAGIC_LEN + PAYLOAD_LEN;
791    let prefix_slice = &mut buf[..total];
792    prefix_slice[..MAGIC_LEN].copy_from_slice(magic);
793    prefix_slice[MAGIC_LEN..total].copy_from_slice(payload);
794    total as u8
795}
796
797/// An in-memory cache of Avro schemas, indexed by their fingerprint.
798///
799/// `SchemaStore` provides a mechanism to store and retrieve Avro schemas efficiently.
800/// Each schema is associated with a unique [`Fingerprint`], which is generated based
801/// on the schema's canonical form and a specific hashing algorithm.
802///
803/// A `SchemaStore` instance is configured to use a single [`FingerprintAlgorithm`] such as Rabin,
804/// MD5 (not yet supported), or SHA256 (not yet supported) for all its operations.
805/// This ensures consistency when generating fingerprints and looking up schemas.
806/// All schemas registered will have their fingerprint computed with this algorithm, and
807/// lookups must use a matching fingerprint.
808///
809/// # Examples
810///
811/// ```no_run
812/// // Create a new store with the default Rabin fingerprinting.
813/// use arrow_avro::schema::{AvroSchema, SchemaStore};
814///
815/// let mut store = SchemaStore::new();
816/// let schema = AvroSchema::new("\"string\"".to_string());
817/// // Register the schema to get its fingerprint.
818/// let fingerprint = store.register(schema.clone()).unwrap();
819/// // Use the fingerprint to look up the schema.
820/// let retrieved_schema = store.lookup(&fingerprint).cloned();
821/// assert_eq!(retrieved_schema, Some(schema));
822/// ```
823#[derive(Debug, Clone, Default)]
824pub struct SchemaStore {
825    /// The hashing algorithm used for generating fingerprints.
826    fingerprint_algorithm: FingerprintAlgorithm,
827    /// A map from a schema's fingerprint to the schema itself.
828    schemas: HashMap<Fingerprint, AvroSchema>,
829}
830
831impl TryFrom<HashMap<Fingerprint, AvroSchema>> for SchemaStore {
832    type Error = ArrowError;
833
834    /// Creates a `SchemaStore` from a HashMap of schemas.
835    /// Each schema in the HashMap is registered with the new store.
836    fn try_from(schemas: HashMap<Fingerprint, AvroSchema>) -> Result<Self, Self::Error> {
837        Ok(Self {
838            schemas,
839            ..Self::default()
840        })
841    }
842}
843
844impl SchemaStore {
845    /// Creates an empty `SchemaStore` using the default fingerprinting algorithm (64-bit Rabin).
846    pub fn new() -> Self {
847        Self::default()
848    }
849
850    /// Creates an empty `SchemaStore` using the default fingerprinting algorithm (64-bit Rabin).
851    pub fn new_with_type(fingerprint_algorithm: FingerprintAlgorithm) -> Self {
852        Self {
853            fingerprint_algorithm,
854            ..Self::default()
855        }
856    }
857
858    /// Registers a schema with the store and the provided fingerprint.
859    /// Note: Confluent wire format implementations should leverage this method.
860    ///
861    /// A schema is set in the store, using the provided fingerprint. If a schema
862    /// with the same fingerprint does not already exist in the store, the new schema
863    /// is inserted. If the fingerprint already exists, the existing schema is not overwritten.
864    ///
865    /// # Arguments
866    ///
867    /// * `fingerprint` - A reference to the `Fingerprint` of the schema to register.
868    /// * `schema` - The `AvroSchema` to register.
869    ///
870    /// # Returns
871    ///
872    /// A `Result` returning the provided `Fingerprint` of the schema if successful,
873    /// or an `ArrowError` on failure.
874    pub fn set(
875        &mut self,
876        fingerprint: Fingerprint,
877        schema: AvroSchema,
878    ) -> Result<Fingerprint, ArrowError> {
879        match self.schemas.entry(fingerprint) {
880            Entry::Occupied(entry) => {
881                if entry.get() != &schema {
882                    return Err(ArrowError::ComputeError(format!(
883                        "Schema fingerprint collision detected for fingerprint {fingerprint:?}"
884                    )));
885                }
886            }
887            Entry::Vacant(entry) => {
888                entry.insert(schema);
889            }
890        }
891        Ok(fingerprint)
892    }
893
894    /// Registers a schema with the store and returns its fingerprint.
895    ///
896    /// A fingerprint is calculated for the given schema using the store's configured
897    /// hash type. If a schema with the same fingerprint does not already exist in the
898    /// store, the new schema is inserted. If the fingerprint already exists, the
899    /// existing schema is not overwritten. If FingerprintAlgorithm is set to Id or Id64, this
900    /// method will return an error. Confluent wire format implementations should leverage the
901    /// set method instead.
902    ///
903    /// # Arguments
904    ///
905    /// * `schema` - The `AvroSchema` to register.
906    ///
907    /// # Returns
908    ///
909    /// A `Result` containing the `Fingerprint` of the schema if successful,
910    /// or an `ArrowError` on failure.
911    pub fn register(&mut self, schema: AvroSchema) -> Result<Fingerprint, ArrowError> {
912        if self.fingerprint_algorithm == FingerprintAlgorithm::Id
913            || self.fingerprint_algorithm == FingerprintAlgorithm::Id64
914        {
915            return Err(ArrowError::SchemaError(
916                "Invalid FingerprintAlgorithm; unable to generate fingerprint. \
917            Use the set method directly instead, providing a valid fingerprint"
918                    .to_string(),
919            ));
920        }
921        let fingerprint =
922            AvroSchema::generate_fingerprint(&schema.schema()?, self.fingerprint_algorithm)?;
923        self.set(fingerprint, schema)?;
924        Ok(fingerprint)
925    }
926
927    /// Looks up a schema by its `Fingerprint`.
928    ///
929    /// # Arguments
930    ///
931    /// * `fingerprint` - A reference to the `Fingerprint` of the schema to look up.
932    ///
933    /// # Returns
934    ///
935    /// An `Option` containing a clone of the `AvroSchema` if found, otherwise `None`.
936    pub fn lookup(&self, fingerprint: &Fingerprint) -> Option<&AvroSchema> {
937        self.schemas.get(fingerprint)
938    }
939
940    /// Returns a `Vec` containing **all unique [`Fingerprint`]s** currently
941    /// held by this [`SchemaStore`].
942    ///
943    /// The order of the returned fingerprints is unspecified and should not be
944    /// relied upon.
945    pub fn fingerprints(&self) -> Vec<Fingerprint> {
946        self.schemas.keys().copied().collect()
947    }
948
949    /// Returns the `FingerprintAlgorithm` used by the `SchemaStore` for fingerprinting.
950    pub(crate) fn fingerprint_algorithm(&self) -> FingerprintAlgorithm {
951        self.fingerprint_algorithm
952    }
953}
954
955fn quote(s: &str) -> Result<String, ArrowError> {
956    serde_json::to_string(s)
957        .map_err(|e| ArrowError::ComputeError(format!("Failed to quote string: {e}")))
958}
959
960// Avro names are defined by a `name` and an optional `namespace`.
961// The full name is composed of the namespace and the name, separated by a dot.
962//
963// Avro specification defines two ways to specify a full name:
964// 1. The `name` attribute contains the full name (e.g., "a.b.c.d").
965//    In this case, the `namespace` attribute is ignored.
966// 2. The `name` attribute contains the simple name (e.g., "d") and the
967//    `namespace` attribute contains the namespace (e.g., "a.b.c").
968//
969// Each part of the name must match the regex `^[A-Za-z_][A-Za-z0-9_]*$`.
970// Complex paths with quotes or backticks like `a."hi".b` are not supported.
971//
972// This function constructs the full name and extracts the namespace,
973// handling both ways of specifying the name. It prioritizes a namespace
974// defined within the `name` attribute itself, then the explicit `namespace_attr`,
975// and finally the `enclosing_ns`.
976pub(crate) fn make_full_name(
977    name: &str,
978    namespace_attr: Option<&str>,
979    enclosing_ns: Option<&str>,
980) -> (String, Option<String>) {
981    // `name` already contains a dot then treat as full-name, ignore namespace.
982    if let Some((ns, _)) = name.rsplit_once('.') {
983        return (name.to_string(), Some(ns.to_string()));
984    }
985    match namespace_attr.or(enclosing_ns) {
986        Some(ns) => (format!("{ns}.{name}"), Some(ns.to_string())),
987        None => (name.to_string(), None),
988    }
989}
990
991fn build_canonical(schema: &Schema, enclosing_ns: Option<&str>) -> Result<String, ArrowError> {
992    Ok(match schema {
993        Schema::TypeName(tn) | Schema::Type(Type { r#type: tn, .. }) => match tn {
994            TypeName::Primitive(pt) => quote(pt.as_ref())?,
995            TypeName::Ref(name) => {
996                let (full_name, _) = make_full_name(name, None, enclosing_ns);
997                quote(&full_name)?
998            }
999        },
1000        Schema::Union(branches) => format!(
1001            "[{}]",
1002            branches
1003                .iter()
1004                .map(|b| build_canonical(b, enclosing_ns))
1005                .collect::<Result<Vec<_>, _>>()?
1006                .join(",")
1007        ),
1008        Schema::Complex(ct) => match ct {
1009            ComplexType::Record(r) => {
1010                let (full_name, child_ns) = make_full_name(r.name, r.namespace, enclosing_ns);
1011                let fields = r
1012                    .fields
1013                    .iter()
1014                    .map(|f| {
1015                        // PCF [STRIP] per Avro spec: keep only attributes relevant to parsing
1016                        // ("name" and "type" for fields) and **strip others** such as doc,
1017                        // default, order, and **aliases**. This preserves canonicalization. See:
1018                        // https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas
1019                        let field_type =
1020                            build_canonical(&f.r#type, child_ns.as_deref().or(enclosing_ns))?;
1021                        Ok(format!(
1022                            r#"{{"name":{},"type":{}}}"#,
1023                            quote(f.name)?,
1024                            field_type
1025                        ))
1026                    })
1027                    .collect::<Result<Vec<_>, ArrowError>>()?
1028                    .join(",");
1029                format!(
1030                    r#"{{"name":{},"type":"record","fields":[{fields}]}}"#,
1031                    quote(&full_name)?,
1032                )
1033            }
1034            ComplexType::Enum(e) => {
1035                let (full_name, _) = make_full_name(e.name, e.namespace, enclosing_ns);
1036                let symbols = e
1037                    .symbols
1038                    .iter()
1039                    .map(|s| quote(s))
1040                    .collect::<Result<Vec<_>, _>>()?
1041                    .join(",");
1042                format!(
1043                    r#"{{"name":{},"type":"enum","symbols":[{symbols}]}}"#,
1044                    quote(&full_name)?
1045                )
1046            }
1047            ComplexType::Array(arr) => format!(
1048                r#"{{"type":"array","items":{}}}"#,
1049                build_canonical(&arr.items, enclosing_ns)?
1050            ),
1051            ComplexType::Map(map) => format!(
1052                r#"{{"type":"map","values":{}}}"#,
1053                build_canonical(&map.values, enclosing_ns)?
1054            ),
1055            ComplexType::Fixed(f) => {
1056                let (full_name, _) = make_full_name(f.name, f.namespace, enclosing_ns);
1057                format!(
1058                    r#"{{"name":{},"type":"fixed","size":{}}}"#,
1059                    quote(&full_name)?,
1060                    f.size
1061                )
1062            }
1063        },
1064    })
1065}
1066
1067/// 64‑bit Rabin fingerprint as described in the Avro spec.
1068const EMPTY: u64 = 0xc15d_213a_a4d7_a795;
1069
1070/// Build one entry of the polynomial‑division table.
1071///
1072/// We cannot yet write `for _ in 0..8` here: `for` loops rely on
1073/// `Iterator::next`, which is not `const` on stable Rust.  Until the
1074/// `const_for` feature (tracking issue #87575) is stabilized, a `while`
1075/// loop is the only option in a `const fn`
1076const fn one_entry(i: usize) -> u64 {
1077    let mut fp = i as u64;
1078    let mut j = 0;
1079    while j < 8 {
1080        fp = (fp >> 1) ^ (EMPTY & (0u64.wrapping_sub(fp & 1)));
1081        j += 1;
1082    }
1083    fp
1084}
1085
1086/// Build the full 256‑entry table at compile time.
1087///
1088/// We cannot yet write `for _ in 0..256` here: `for` loops rely on
1089/// `Iterator::next`, which is not `const` on stable Rust.  Until the
1090/// `const_for` feature (tracking issue #87575) is stabilized, a `while`
1091/// loop is the only option in a `const fn`
1092const fn build_table() -> [u64; 256] {
1093    let mut table = [0u64; 256];
1094    let mut i = 0;
1095    while i < 256 {
1096        table[i] = one_entry(i);
1097        i += 1;
1098    }
1099    table
1100}
1101
1102/// The pre‑computed table.
1103static FINGERPRINT_TABLE: [u64; 256] = build_table();
1104
1105/// Computes the 64-bit Rabin fingerprint for a given canonical schema string.
1106/// This implementation is based on the Avro specification for schema fingerprinting.
1107pub(crate) fn compute_fingerprint_rabin(canonical_form: &str) -> u64 {
1108    let mut fp = EMPTY;
1109    for &byte in canonical_form.as_bytes() {
1110        let idx = ((fp as u8) ^ byte) as usize;
1111        fp = (fp >> 8) ^ FINGERPRINT_TABLE[idx];
1112    }
1113    fp
1114}
1115
1116#[cfg(feature = "md5")]
1117/// Compute the **128‑bit MD5** fingerprint of the canonical form.
1118///
1119/// Returns a 16‑byte array (`[u8; 16]`) containing the full MD5 digest,
1120/// exactly as required by the Avro specification.
1121#[inline]
1122pub(crate) fn compute_fingerprint_md5(canonical_form: &str) -> [u8; 16] {
1123    let digest = md5::compute(canonical_form.as_bytes());
1124    digest.0
1125}
1126
1127#[cfg(feature = "sha256")]
1128/// Compute the **256‑bit SHA‑256** fingerprint of the canonical form.
1129///
1130/// Returns a 32‑byte array (`[u8; 32]`) containing the full SHA‑256 digest.
1131#[inline]
1132pub(crate) fn compute_fingerprint_sha256(canonical_form: &str) -> [u8; 32] {
1133    let mut hasher = Sha256::new();
1134    hasher.update(canonical_form.as_bytes());
1135    let digest = hasher.finalize();
1136    digest.into()
1137}
1138
1139#[inline]
1140fn is_internal_arrow_key(key: &str) -> bool {
1141    key.starts_with("ARROW:") || key == SCHEMA_METADATA_KEY
1142}
1143
1144/// Copies Arrow schema metadata entries to the provided JSON map,
1145/// skipping keys that are Avro-reserved, internal Arrow keys, or
1146/// nested under the `avro.schema.` namespace. Values that parse as
1147/// JSON are inserted as JSON; otherwise the raw string is preserved.
1148fn extend_with_passthrough_metadata(
1149    target: &mut JsonMap<String, Value>,
1150    metadata: &HashMap<String, String>,
1151) {
1152    for (meta_key, meta_val) in metadata {
1153        if meta_key.starts_with("avro.") || is_internal_arrow_key(meta_key) {
1154            continue;
1155        }
1156        let json_val =
1157            serde_json::from_str(meta_val).unwrap_or_else(|_| Value::String(meta_val.clone()));
1158        target.insert(meta_key.clone(), json_val);
1159    }
1160}
1161
1162// Sanitize an arbitrary string so it is a valid Avro field or type name
1163fn sanitise_avro_name(base_name: &str) -> String {
1164    if base_name.is_empty() {
1165        return "_".to_owned();
1166    }
1167    let mut out: String = base_name
1168        .chars()
1169        .map(|char| {
1170            if char.is_ascii_alphanumeric() || char == '_' {
1171                char
1172            } else {
1173                '_'
1174            }
1175        })
1176        .collect();
1177    if out.as_bytes()[0].is_ascii_digit() {
1178        out.insert(0, '_');
1179    }
1180    out
1181}
1182
1183#[derive(Default)]
1184struct NameGenerator {
1185    used: HashSet<String>,
1186    counters: HashMap<String, usize>,
1187}
1188
1189impl NameGenerator {
1190    fn make_unique(&mut self, field_name: &str) -> String {
1191        let field_name = sanitise_avro_name(field_name);
1192        if self.used.insert(field_name.clone()) {
1193            self.counters.insert(field_name.clone(), 1);
1194            return field_name;
1195        }
1196        let counter = self.counters.entry(field_name.clone()).or_insert(1);
1197        loop {
1198            let candidate = format!("{field_name}_{}", *counter);
1199            if self.used.insert(candidate.clone()) {
1200                return candidate;
1201            }
1202            *counter += 1;
1203        }
1204    }
1205}
1206
1207fn merge_extras(schema: Value, extras: JsonMap<String, Value>) -> Value {
1208    if extras.is_empty() {
1209        return schema;
1210    }
1211    match schema {
1212        Value::Object(mut map) => {
1213            map.extend(extras);
1214            Value::Object(map)
1215        }
1216        Value::Array(mut union) => {
1217            // For unions, we cannot attach attributes to the array itself (per Avro spec).
1218            // As a fallback for extension metadata, attach extras to the first non-null branch object.
1219            if let Some(non_null) = union.iter_mut().find(|val| val.as_str() != Some("null")) {
1220                let original = std::mem::take(non_null);
1221                *non_null = merge_extras(original, extras);
1222            }
1223            Value::Array(union)
1224        }
1225        primitive => {
1226            let mut map = JsonMap::with_capacity(extras.len() + 1);
1227            map.insert("type".into(), primitive);
1228            map.extend(extras);
1229            Value::Object(map)
1230        }
1231    }
1232}
1233
1234#[inline]
1235fn is_avro_json_null(v: &Value) -> bool {
1236    matches!(v, Value::String(s) if s == "null")
1237}
1238
1239fn wrap_nullable(inner: Value, null_order: Nullability) -> Value {
1240    let null = Value::String("null".into());
1241    match inner {
1242        Value::Array(mut union) => {
1243            // If this site is already a union and already contains "null",
1244            // preserve the branch order exactly. Reordering "null" breaks
1245            // the correspondence between Arrow union child order (type_ids)
1246            // and the Avro branch index written on the wire.
1247            if union.iter().any(is_avro_json_null) {
1248                return Value::Array(union);
1249            }
1250            // Otherwise, inject "null" without reordering existing branches.
1251            match null_order {
1252                Nullability::NullFirst => union.insert(0, null),
1253                Nullability::NullSecond => union.push(null),
1254            }
1255            Value::Array(union)
1256        }
1257        other => match null_order {
1258            Nullability::NullFirst => Value::Array(vec![null, other]),
1259            Nullability::NullSecond => Value::Array(vec![other, null]),
1260        },
1261    }
1262}
1263
1264fn min_fixed_bytes_for_precision(p: usize) -> usize {
1265    // From the spec: max precision for n=1..=32 bytes:
1266    // [2,4,6,9,11,14,16,18,21,23,26,28,31,33,35,38,40,43,45,47,50,52,55,57,59,62,64,67,69,71,74,76]
1267    const MAX_P: [usize; 32] = [
1268        2, 4, 6, 9, 11, 14, 16, 18, 21, 23, 26, 28, 31, 33, 35, 38, 40, 43, 45, 47, 50, 52, 55, 57,
1269        59, 62, 64, 67, 69, 71, 74, 76,
1270    ];
1271    for (i, &max_p) in MAX_P.iter().enumerate() {
1272        if p <= max_p {
1273            return i + 1;
1274        }
1275    }
1276    32 // saturate at Decimal256
1277}
1278
1279fn union_branch_signature(branch: &Value) -> Result<String, ArrowError> {
1280    match branch {
1281        Value::String(t) => Ok(format!("P:{t}")),
1282        Value::Object(map) => {
1283            let t = map.get("type").and_then(|v| v.as_str()).ok_or_else(|| {
1284                ArrowError::SchemaError("Union branch object missing string 'type'".into())
1285            })?;
1286            match t {
1287                "record" | "enum" | "fixed" => {
1288                    let name = map.get("name").and_then(|v| v.as_str()).ok_or_else(|| {
1289                        ArrowError::SchemaError(format!(
1290                            "Union branch '{t}' missing required 'name'"
1291                        ))
1292                    })?;
1293                    Ok(format!("N:{t}:{name}"))
1294                }
1295                "array" | "map" => Ok(format!("C:{t}")),
1296                other => Ok(format!("P:{other}")),
1297            }
1298        }
1299        Value::Array(_) => Err(ArrowError::SchemaError(
1300            "Avro union may not immediately contain another union".into(),
1301        )),
1302        _ => Err(ArrowError::SchemaError(
1303            "Invalid JSON for Avro union branch".into(),
1304        )),
1305    }
1306}
1307
1308fn datatype_to_avro(
1309    dt: &DataType,
1310    field_name: &str,
1311    metadata: &HashMap<String, String>,
1312    name_gen: &mut NameGenerator,
1313    null_order: Nullability,
1314    strip: bool,
1315) -> Result<(Value, JsonMap<String, Value>), ArrowError> {
1316    let mut extras = JsonMap::new();
1317    let mut handle_decimal = |precision: &u8, scale: &i8| -> Result<Value, ArrowError> {
1318        if *scale < 0 {
1319            return Err(ArrowError::SchemaError(format!(
1320                "Invalid Avro decimal for field '{field_name}': scale ({scale}) must be >= 0"
1321            )));
1322        }
1323        if (*scale as usize) > (*precision as usize) {
1324            return Err(ArrowError::SchemaError(format!(
1325                "Invalid Avro decimal for field '{field_name}': scale ({scale}) \
1326                 must be <= precision ({precision})"
1327            )));
1328        }
1329        let mut meta = JsonMap::from_iter([
1330            ("logicalType".into(), json!("decimal")),
1331            ("precision".into(), json!(*precision)),
1332            ("scale".into(), json!(*scale)),
1333        ]);
1334        let mut fixed_size = metadata.get("size").and_then(|v| v.parse::<usize>().ok());
1335        let carries_name = metadata.contains_key(AVRO_NAME_METADATA_KEY)
1336            || metadata.contains_key(AVRO_NAMESPACE_METADATA_KEY);
1337        if fixed_size.is_none() && carries_name {
1338            fixed_size = Some(min_fixed_bytes_for_precision(*precision as usize));
1339        }
1340        if let Some(size) = fixed_size {
1341            meta.insert("type".into(), json!("fixed"));
1342            meta.insert("size".into(), json!(size));
1343            let chosen_name = metadata
1344                .get(AVRO_NAME_METADATA_KEY)
1345                .map(|s| sanitise_avro_name(s))
1346                .unwrap_or_else(|| name_gen.make_unique(field_name));
1347            meta.insert("name".into(), json!(chosen_name));
1348            if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1349                meta.insert("namespace".into(), json!(ns));
1350            }
1351        } else {
1352            // default to bytes-backed decimal
1353            meta.insert("type".into(), json!("bytes"));
1354        }
1355        Ok(Value::Object(meta))
1356    };
1357    let val = match dt {
1358        DataType::Null => Value::String("null".into()),
1359        DataType::Boolean => Value::String("boolean".into()),
1360        #[cfg(not(feature = "avro_custom_types"))]
1361        DataType::Int8 | DataType::Int16 | DataType::UInt8 | DataType::UInt16 => {
1362            Value::String("int".into())
1363        }
1364        DataType::Int32 => Value::String("int".into()),
1365        #[cfg(feature = "avro_custom_types")]
1366        DataType::Int8 => json!({ "type": "int", "logicalType": "arrow.int8" }),
1367        #[cfg(feature = "avro_custom_types")]
1368        DataType::Int16 => json!({ "type": "int", "logicalType": "arrow.int16" }),
1369        #[cfg(feature = "avro_custom_types")]
1370        DataType::UInt8 => json!({ "type": "int", "logicalType": "arrow.uint8" }),
1371        #[cfg(feature = "avro_custom_types")]
1372        DataType::UInt16 => json!({ "type": "int", "logicalType": "arrow.uint16" }),
1373        #[cfg(not(feature = "avro_custom_types"))]
1374        DataType::UInt32 => Value::String("long".into()),
1375        #[cfg(feature = "avro_custom_types")]
1376        DataType::UInt32 => json!({ "type": "long", "logicalType": "arrow.uint32" }),
1377        DataType::Int64 => Value::String("long".into()),
1378        #[cfg(not(feature = "avro_custom_types"))]
1379        DataType::UInt64 => Value::String("long".into()),
1380        #[cfg(feature = "avro_custom_types")]
1381        DataType::UInt64 => {
1382            // UInt64 must use fixed(8) to avoid overflow
1383            let chosen_name = metadata
1384                .get(AVRO_NAME_METADATA_KEY)
1385                .map(|s| sanitise_avro_name(s))
1386                .unwrap_or_else(|| name_gen.make_unique(field_name));
1387            let mut obj = JsonMap::from_iter([
1388                ("type".into(), json!("fixed")),
1389                ("name".into(), json!(chosen_name)),
1390                ("size".into(), json!(8)),
1391                ("logicalType".into(), json!("arrow.uint64")),
1392            ]);
1393            if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1394                obj.insert("namespace".into(), json!(ns));
1395            }
1396            json!(obj)
1397        }
1398        #[cfg(not(feature = "avro_custom_types"))]
1399        DataType::Float16 => Value::String("float".into()),
1400        #[cfg(feature = "avro_custom_types")]
1401        DataType::Float16 => {
1402            // Float16 uses fixed(2) for IEEE-754 bits
1403            let chosen_name = metadata
1404                .get(AVRO_NAME_METADATA_KEY)
1405                .map(|s| sanitise_avro_name(s))
1406                .unwrap_or_else(|| name_gen.make_unique(field_name));
1407            let mut obj = JsonMap::from_iter([
1408                ("type".into(), json!("fixed")),
1409                ("name".into(), json!(chosen_name)),
1410                ("size".into(), json!(2)),
1411                ("logicalType".into(), json!("arrow.float16")),
1412            ]);
1413            if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1414                obj.insert("namespace".into(), json!(ns));
1415            }
1416            json!(obj)
1417        }
1418        DataType::Float32 => Value::String("float".into()),
1419        DataType::Float64 => Value::String("double".into()),
1420        DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => Value::String("string".into()),
1421        DataType::Binary | DataType::LargeBinary => Value::String("bytes".into()),
1422        DataType::BinaryView => {
1423            if !strip {
1424                extras.insert("arrowBinaryView".into(), Value::Bool(true));
1425            }
1426            Value::String("bytes".into())
1427        }
1428        DataType::FixedSizeBinary(len) => {
1429            let md_is_uuid = metadata
1430                .get("logicalType")
1431                .map(|s| s.trim_matches('"') == "uuid")
1432                .unwrap_or(false);
1433            #[cfg(feature = "canonical_extension_types")]
1434            let ext_is_uuid = metadata
1435                .get(arrow_schema::extension::EXTENSION_TYPE_NAME_KEY)
1436                .map(|v| v == arrow_schema::extension::Uuid::NAME || v == "uuid")
1437                .unwrap_or(false);
1438            #[cfg(not(feature = "canonical_extension_types"))]
1439            let ext_is_uuid = false;
1440            let is_uuid = (*len == 16) && (md_is_uuid || ext_is_uuid);
1441            if is_uuid {
1442                json!({ "type": "string", "logicalType": "uuid" })
1443            } else {
1444                let chosen_name = metadata
1445                    .get(AVRO_NAME_METADATA_KEY)
1446                    .map(|s| sanitise_avro_name(s))
1447                    .unwrap_or_else(|| name_gen.make_unique(field_name));
1448                let mut obj = JsonMap::from_iter([
1449                    ("type".into(), json!("fixed")),
1450                    ("name".into(), json!(chosen_name)),
1451                    ("size".into(), json!(len)),
1452                ]);
1453                if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1454                    obj.insert("namespace".into(), json!(ns));
1455                }
1456                Value::Object(obj)
1457            }
1458        }
1459        #[cfg(feature = "small_decimals")]
1460        DataType::Decimal32(precision, scale) | DataType::Decimal64(precision, scale) => {
1461            handle_decimal(precision, scale)?
1462        }
1463        DataType::Decimal128(precision, scale) | DataType::Decimal256(precision, scale) => {
1464            handle_decimal(precision, scale)?
1465        }
1466        DataType::Date32 => json!({ "type": "int", "logicalType": "date" }),
1467        #[cfg(not(feature = "avro_custom_types"))]
1468        DataType::Date64 => json!({ "type": "long", "logicalType": "local-timestamp-millis" }),
1469        #[cfg(feature = "avro_custom_types")]
1470        DataType::Date64 => json!({ "type": "long", "logicalType": "arrow.date64" }),
1471        DataType::Time32(unit) => match unit {
1472            TimeUnit::Millisecond => json!({ "type": "int", "logicalType": "time-millis" }),
1473            #[cfg(not(feature = "avro_custom_types"))]
1474            TimeUnit::Second => {
1475                // Encoder converts seconds to milliseconds, so use time-millis
1476                if !strip {
1477                    extras.insert("arrowTimeUnit".into(), Value::String("second".into()));
1478                }
1479                json!({ "type": "int", "logicalType": "time-millis" })
1480            }
1481            #[cfg(feature = "avro_custom_types")]
1482            TimeUnit::Second => {
1483                json!({ "type": "int", "logicalType": "arrow.time32-second" })
1484            }
1485            _ => Value::String("int".into()),
1486        },
1487        DataType::Time64(unit) => match unit {
1488            TimeUnit::Microsecond => json!({ "type": "long", "logicalType": "time-micros" }),
1489            #[cfg(not(feature = "avro_custom_types"))]
1490            TimeUnit::Nanosecond => {
1491                // Encoder truncates nanoseconds to microseconds, so use time-micros
1492                if !strip {
1493                    extras.insert("arrowTimeUnit".into(), Value::String("nanosecond".into()));
1494                }
1495                json!({ "type": "long", "logicalType": "time-micros" })
1496            }
1497            #[cfg(feature = "avro_custom_types")]
1498            TimeUnit::Nanosecond => {
1499                json!({ "type": "long", "logicalType": "arrow.time64-nanosecond" })
1500            }
1501            _ => Value::String("long".into()),
1502        },
1503        DataType::Timestamp(unit, tz) => {
1504            #[cfg(feature = "avro_custom_types")]
1505            if matches!(unit, TimeUnit::Second) {
1506                let logical_type = if tz.is_some() {
1507                    "arrow.timestamp-second"
1508                } else {
1509                    "arrow.local-timestamp-second"
1510                };
1511                return Ok((
1512                    json!({ "type": "long", "logicalType": logical_type }),
1513                    extras,
1514                ));
1515            }
1516            let logical_type = match (unit, tz.is_some()) {
1517                (TimeUnit::Millisecond, true) => "timestamp-millis",
1518                (TimeUnit::Millisecond, false) => "local-timestamp-millis",
1519                (TimeUnit::Microsecond, true) => "timestamp-micros",
1520                (TimeUnit::Microsecond, false) => "local-timestamp-micros",
1521                (TimeUnit::Nanosecond, true) => "timestamp-nanos",
1522                (TimeUnit::Nanosecond, false) => "local-timestamp-nanos",
1523                (TimeUnit::Second, has_tz) => {
1524                    // Encoder converts seconds to milliseconds, so use timestamp-millis
1525                    if !strip {
1526                        extras.insert("arrowTimeUnit".into(), Value::String("second".into()));
1527                    }
1528                    let ts_logical_type = if has_tz {
1529                        "timestamp-millis"
1530                    } else {
1531                        "local-timestamp-millis"
1532                    };
1533                    return Ok((
1534                        json!({ "type": "long", "logicalType": ts_logical_type }),
1535                        extras,
1536                    ));
1537                }
1538            };
1539            if !strip && matches!(unit, TimeUnit::Nanosecond) {
1540                extras.insert("arrowTimeUnit".into(), Value::String("nanosecond".into()));
1541            }
1542            json!({ "type": "long", "logicalType": logical_type })
1543        }
1544        #[cfg(not(feature = "avro_custom_types"))]
1545        DataType::Duration(_unit) => Value::String("long".into()),
1546        #[cfg(feature = "avro_custom_types")]
1547        DataType::Duration(unit) => {
1548            // When the feature is enabled, create an Avro schema object
1549            // with the correct `logicalType` annotation.
1550            let logical_type = match unit {
1551                TimeUnit::Second => "arrow.duration-seconds",
1552                TimeUnit::Millisecond => "arrow.duration-millis",
1553                TimeUnit::Microsecond => "arrow.duration-micros",
1554                TimeUnit::Nanosecond => "arrow.duration-nanos",
1555            };
1556            json!({ "type": "long", "logicalType": logical_type })
1557        }
1558        #[cfg(not(feature = "avro_custom_types"))]
1559        DataType::Interval(IntervalUnit::MonthDayNano) => {
1560            // Avro duration logical type: fixed(12) with months/days/millis per spec.
1561            let chosen_name = metadata
1562                .get(AVRO_NAME_METADATA_KEY)
1563                .map(|s| sanitise_avro_name(s))
1564                .unwrap_or_else(|| name_gen.make_unique(field_name));
1565            let mut obj = JsonMap::from_iter([
1566                ("type".into(), json!("fixed")),
1567                ("name".into(), json!(chosen_name)),
1568                ("size".into(), json!(12)),
1569                ("logicalType".into(), json!("duration")),
1570            ]);
1571            if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1572                obj.insert("namespace".into(), json!(ns));
1573            }
1574            json!(obj)
1575        }
1576        #[cfg(feature = "avro_custom_types")]
1577        DataType::Interval(IntervalUnit::MonthDayNano) => {
1578            // Arrow MonthDayNano interval: i32 months + i32 days + i64 nanos (16 bytes).
1579            // We preserve the Arrow native representation via a custom logical type.
1580            let chosen_name = metadata
1581                .get(AVRO_NAME_METADATA_KEY)
1582                .map(|s| sanitise_avro_name(s))
1583                .unwrap_or_else(|| name_gen.make_unique(field_name));
1584            let mut obj = JsonMap::from_iter([
1585                ("type".into(), json!("fixed")),
1586                ("name".into(), json!(chosen_name)),
1587                ("size".into(), json!(16)),
1588                ("logicalType".into(), json!("arrow.interval-month-day-nano")),
1589            ]);
1590            if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1591                obj.insert("namespace".into(), json!(ns));
1592            }
1593            json!(obj)
1594        }
1595        #[cfg(not(feature = "avro_custom_types"))]
1596        DataType::Interval(IntervalUnit::YearMonth) => {
1597            // Encode as Avro `duration` (fixed(12)) like MonthDayNano
1598            let chosen_name = metadata
1599                .get(AVRO_NAME_METADATA_KEY)
1600                .map(|s| sanitise_avro_name(s))
1601                .unwrap_or_else(|| name_gen.make_unique(field_name));
1602            let mut extras = JsonMap::from_iter([
1603                ("type".into(), json!("fixed")),
1604                ("name".into(), json!(chosen_name)),
1605                ("size".into(), json!(12)),
1606                ("logicalType".into(), json!("duration")),
1607            ]);
1608            if !strip {
1609                extras.insert(
1610                    "arrowIntervalUnit".into(),
1611                    Value::String("yearmonth".into()),
1612                );
1613            }
1614            if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1615                extras.insert("namespace".into(), json!(ns));
1616            }
1617            json!(extras)
1618        }
1619        #[cfg(feature = "avro_custom_types")]
1620        DataType::Interval(IntervalUnit::YearMonth) => {
1621            let chosen_name = metadata
1622                .get(AVRO_NAME_METADATA_KEY)
1623                .map(|s| sanitise_avro_name(s))
1624                .unwrap_or_else(|| name_gen.make_unique(field_name));
1625            let mut obj = JsonMap::from_iter([
1626                ("type".into(), json!("fixed")),
1627                ("name".into(), json!(chosen_name)),
1628                ("size".into(), json!(4)),
1629                ("logicalType".into(), json!("arrow.interval-year-month")),
1630            ]);
1631            if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1632                obj.insert("namespace".into(), json!(ns));
1633            }
1634            json!(obj)
1635        }
1636        #[cfg(not(feature = "avro_custom_types"))]
1637        DataType::Interval(IntervalUnit::DayTime) => {
1638            // Encode as Avro `duration` (fixed(12)) like MonthDayNano
1639            let chosen_name = metadata
1640                .get(AVRO_NAME_METADATA_KEY)
1641                .map(|s| sanitise_avro_name(s))
1642                .unwrap_or_else(|| name_gen.make_unique(field_name));
1643            let mut obj = JsonMap::from_iter([
1644                ("type".into(), json!("fixed")),
1645                ("name".into(), json!(chosen_name)),
1646                ("size".into(), json!(12)),
1647                ("logicalType".into(), json!("duration")),
1648            ]);
1649            if !strip {
1650                obj.insert("arrowIntervalUnit".into(), Value::String("daytime".into()));
1651            }
1652            if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1653                obj.insert("namespace".into(), json!(ns));
1654            }
1655            json!(obj)
1656        }
1657        #[cfg(feature = "avro_custom_types")]
1658        DataType::Interval(IntervalUnit::DayTime) => {
1659            let chosen_name = metadata
1660                .get(AVRO_NAME_METADATA_KEY)
1661                .map(|s| sanitise_avro_name(s))
1662                .unwrap_or_else(|| name_gen.make_unique(field_name));
1663            let mut obj = JsonMap::from_iter([
1664                ("type".into(), json!("fixed")),
1665                ("name".into(), json!(chosen_name)),
1666                ("size".into(), json!(8)),
1667                ("logicalType".into(), json!("arrow.interval-day-time")),
1668            ]);
1669            if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1670                obj.insert("namespace".into(), json!(ns));
1671            }
1672            json!(obj)
1673        }
1674        DataType::List(child) | DataType::LargeList(child) => {
1675            if matches!(dt, DataType::LargeList(_)) && !strip {
1676                extras.insert("arrowLargeList".into(), Value::Bool(true));
1677            }
1678            let items_schema = process_datatype(
1679                child.data_type(),
1680                child.name(),
1681                child.metadata(),
1682                name_gen,
1683                null_order,
1684                child.is_nullable(),
1685                strip,
1686            )?;
1687            json!({
1688                "type": "array",
1689                "items": items_schema
1690            })
1691        }
1692        DataType::ListView(child) | DataType::LargeListView(child) => {
1693            if matches!(dt, DataType::LargeListView(_)) && !strip {
1694                extras.insert("arrowLargeList".into(), Value::Bool(true));
1695            }
1696            if !strip {
1697                extras.insert("arrowListView".into(), Value::Bool(true));
1698            }
1699            let items_schema = process_datatype(
1700                child.data_type(),
1701                child.name(),
1702                child.metadata(),
1703                name_gen,
1704                null_order,
1705                child.is_nullable(),
1706                strip,
1707            )?;
1708            json!({
1709                "type": "array",
1710                "items": items_schema
1711            })
1712        }
1713        DataType::FixedSizeList(child, len) => {
1714            if !strip {
1715                extras.insert("arrowFixedSize".into(), json!(len));
1716            }
1717            let items_schema = process_datatype(
1718                child.data_type(),
1719                child.name(),
1720                child.metadata(),
1721                name_gen,
1722                null_order,
1723                child.is_nullable(),
1724                strip,
1725            )?;
1726            json!({
1727                "type": "array",
1728                "items": items_schema
1729            })
1730        }
1731        DataType::Map(entries, _) => {
1732            let value_field = match entries.data_type() {
1733                DataType::Struct(fs) => &fs[1],
1734                _ => {
1735                    return Err(ArrowError::SchemaError(
1736                        "Map 'entries' field must be Struct(key,value)".into(),
1737                    ));
1738                }
1739            };
1740            let values_schema = process_datatype(
1741                value_field.data_type(),
1742                value_field.name(),
1743                value_field.metadata(),
1744                name_gen,
1745                null_order,
1746                value_field.is_nullable(),
1747                strip,
1748            )?;
1749            json!({
1750                "type": "map",
1751                "values": values_schema
1752            })
1753        }
1754        DataType::Struct(fields) => {
1755            let avro_fields = fields
1756                .iter()
1757                .map(|field| arrow_field_to_avro(field, name_gen, null_order, strip))
1758                .collect::<Result<Vec<_>, _>>()?;
1759            // Prefer avro.name/avro.namespace when provided on the struct field metadata
1760            let chosen_name = metadata
1761                .get(AVRO_NAME_METADATA_KEY)
1762                .map(|s| sanitise_avro_name(s))
1763                .unwrap_or_else(|| name_gen.make_unique(field_name));
1764            let mut obj = JsonMap::from_iter([
1765                ("type".into(), json!("record")),
1766                ("name".into(), json!(chosen_name)),
1767                ("fields".into(), Value::Array(avro_fields)),
1768            ]);
1769            if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1770                obj.insert("namespace".into(), json!(ns));
1771            }
1772            Value::Object(obj)
1773        }
1774        DataType::Dictionary(_, value) => {
1775            if let Some(j) = metadata.get(AVRO_ENUM_SYMBOLS_METADATA_KEY) {
1776                let symbols: Vec<&str> =
1777                    serde_json::from_str(j).map_err(|e| ArrowError::ParseError(e.to_string()))?;
1778                // Prefer avro.name/namespace when provided for enums
1779                let chosen_name = metadata
1780                    .get(AVRO_NAME_METADATA_KEY)
1781                    .map(|s| sanitise_avro_name(s))
1782                    .unwrap_or_else(|| name_gen.make_unique(field_name));
1783                let mut obj = JsonMap::from_iter([
1784                    ("type".into(), json!("enum")),
1785                    ("name".into(), json!(chosen_name)),
1786                    ("symbols".into(), json!(symbols)),
1787                ]);
1788                if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1789                    obj.insert("namespace".into(), json!(ns));
1790                }
1791                Value::Object(obj)
1792            } else {
1793                process_datatype(
1794                    value.as_ref(),
1795                    field_name,
1796                    metadata,
1797                    name_gen,
1798                    null_order,
1799                    false,
1800                    strip,
1801                )?
1802            }
1803        }
1804        #[cfg(feature = "avro_custom_types")]
1805        DataType::RunEndEncoded(run_ends, values) => {
1806            let bits = match run_ends.data_type() {
1807                DataType::Int16 => 16,
1808                DataType::Int32 => 32,
1809                DataType::Int64 => 64,
1810                other => {
1811                    return Err(ArrowError::SchemaError(format!(
1812                        "RunEndEncoded requires Int16/Int32/Int64 for run_ends, found: {other:?}"
1813                    )));
1814                }
1815            };
1816            // Build the value site schema, preserving its own nullability
1817            let (value_schema, value_extras) = datatype_to_avro(
1818                values.data_type(),
1819                values.name(),
1820                values.metadata(),
1821                name_gen,
1822                null_order,
1823                strip,
1824            )?;
1825            let mut merged = merge_extras(value_schema, value_extras);
1826            if values.is_nullable() {
1827                merged = wrap_nullable(merged, null_order);
1828            }
1829            let mut extras = JsonMap::new();
1830            extras.insert("logicalType".into(), json!("arrow.run-end-encoded"));
1831            extras.insert("arrow.runEndIndexBits".into(), json!(bits));
1832            return Ok((merged, extras));
1833        }
1834        #[cfg(not(feature = "avro_custom_types"))]
1835        DataType::RunEndEncoded(_run_ends, values) => {
1836            let (value_schema, _extras) = datatype_to_avro(
1837                values.data_type(),
1838                values.name(),
1839                values.metadata(),
1840                name_gen,
1841                null_order,
1842                strip,
1843            )?;
1844            return Ok((value_schema, JsonMap::new()));
1845        }
1846        DataType::Union(fields, mode) => {
1847            let mut branches: Vec<Value> = Vec::with_capacity(fields.len());
1848            let mut type_ids: Vec<i32> = Vec::with_capacity(fields.len());
1849            for (type_id, field_ref) in fields.iter() {
1850                // NOTE: `process_datatype` would wrap nullability; force is_nullable=false here.
1851                let (branch_schema, _branch_extras) = datatype_to_avro(
1852                    field_ref.data_type(),
1853                    field_ref.name(),
1854                    field_ref.metadata(),
1855                    name_gen,
1856                    null_order,
1857                    strip,
1858                )?;
1859                // Avro unions cannot immediately contain another union
1860                if matches!(branch_schema, Value::Array(_)) {
1861                    return Err(ArrowError::SchemaError(
1862                        "Avro union may not immediately contain another union".into(),
1863                    ));
1864                }
1865                branches.push(branch_schema);
1866                type_ids.push(type_id as i32);
1867            }
1868            let mut seen: HashSet<String> = HashSet::with_capacity(branches.len());
1869            for b in &branches {
1870                let sig = union_branch_signature(b)?;
1871                if !seen.insert(sig) {
1872                    return Err(ArrowError::SchemaError(
1873                        "Avro union contains duplicate branch types (disallowed by spec)".into(),
1874                    ));
1875                }
1876            }
1877            if !strip {
1878                extras.insert(
1879                    "arrowUnionMode".into(),
1880                    Value::String(
1881                        match mode {
1882                            UnionMode::Sparse => "sparse",
1883                            UnionMode::Dense => "dense",
1884                        }
1885                        .to_string(),
1886                    ),
1887                );
1888                extras.insert(
1889                    "arrowUnionTypeIds".into(),
1890                    Value::Array(type_ids.into_iter().map(|id| json!(id)).collect()),
1891                );
1892            }
1893            Value::Array(branches)
1894        }
1895        #[cfg(not(feature = "small_decimals"))]
1896        other => {
1897            return Err(ArrowError::NotYetImplemented(format!(
1898                "Arrow type {other:?} has no Avro representation"
1899            )));
1900        }
1901    };
1902    Ok((val, extras))
1903}
1904
1905fn process_datatype(
1906    dt: &DataType,
1907    field_name: &str,
1908    metadata: &HashMap<String, String>,
1909    name_gen: &mut NameGenerator,
1910    null_order: Nullability,
1911    is_nullable: bool,
1912    strip: bool,
1913) -> Result<Value, ArrowError> {
1914    let (schema, extras) = datatype_to_avro(dt, field_name, metadata, name_gen, null_order, strip)?;
1915    let mut merged = merge_extras(schema, extras);
1916    if is_nullable {
1917        merged = wrap_nullable(merged, null_order)
1918    }
1919    Ok(merged)
1920}
1921
1922fn arrow_field_to_avro(
1923    field: &ArrowField,
1924    name_gen: &mut NameGenerator,
1925    null_order: Nullability,
1926    strip: bool,
1927) -> Result<Value, ArrowError> {
1928    let avro_name = sanitise_avro_name(field.name());
1929    let schema_value = process_datatype(
1930        field.data_type(),
1931        &avro_name,
1932        field.metadata(),
1933        name_gen,
1934        null_order,
1935        field.is_nullable(),
1936        strip,
1937    )?;
1938    // Build the field map
1939    let mut map = JsonMap::with_capacity(field.metadata().len() + 3);
1940    map.insert("name".into(), Value::String(avro_name));
1941    map.insert("type".into(), schema_value);
1942    // Transfer selected metadata
1943    for (meta_key, meta_val) in field.metadata() {
1944        if is_internal_arrow_key(meta_key) {
1945            continue;
1946        }
1947        match meta_key.as_str() {
1948            AVRO_DOC_METADATA_KEY => {
1949                map.insert("doc".into(), Value::String(meta_val.clone()));
1950            }
1951            AVRO_FIELD_DEFAULT_METADATA_KEY => {
1952                let default_value = serde_json::from_str(meta_val)
1953                    .unwrap_or_else(|_| Value::String(meta_val.clone()));
1954                map.insert("default".into(), default_value);
1955            }
1956            _ => {
1957                let json_val = serde_json::from_str(meta_val)
1958                    .unwrap_or_else(|_| Value::String(meta_val.clone()));
1959                map.insert(meta_key.clone(), json_val);
1960            }
1961        }
1962    }
1963    Ok(Value::Object(map))
1964}
1965
1966#[cfg(test)]
1967mod tests {
1968    use super::*;
1969    use crate::codec::{AvroField, AvroFieldBuilder};
1970    use arrow_schema::{DataType, Fields, SchemaBuilder, TimeUnit, UnionFields};
1971    use serde_json::json;
1972    use std::sync::Arc;
1973
1974    fn int_schema() -> Schema<'static> {
1975        Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))
1976    }
1977
1978    fn record_schema() -> Schema<'static> {
1979        Schema::Complex(ComplexType::Record(Record {
1980            name: "record1",
1981            namespace: Some("test.namespace"),
1982            doc: Some(Cow::from("A test record")),
1983            aliases: vec![],
1984            fields: vec![
1985                Field {
1986                    name: "field1",
1987                    doc: Some(Cow::from("An integer field")),
1988                    r#type: int_schema(),
1989                    default: None,
1990                    aliases: vec![],
1991                },
1992                Field {
1993                    name: "field2",
1994                    doc: None,
1995                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
1996                    default: None,
1997                    aliases: vec![],
1998                },
1999            ],
2000            attributes: Attributes::default(),
2001        }))
2002    }
2003
2004    fn single_field_schema(field: ArrowField) -> arrow_schema::Schema {
2005        let mut sb = SchemaBuilder::new();
2006        sb.push(field);
2007        sb.finish()
2008    }
2009
2010    fn assert_json_contains(avro_json: &str, needle: &str) {
2011        assert!(
2012            avro_json.contains(needle),
2013            "JSON did not contain `{needle}` : {avro_json}"
2014        )
2015    }
2016
2017    #[test]
2018    fn test_deserialize() {
2019        let t: Schema = serde_json::from_str("\"string\"").unwrap();
2020        assert_eq!(
2021            t,
2022            Schema::TypeName(TypeName::Primitive(PrimitiveType::String))
2023        );
2024
2025        let t: Schema = serde_json::from_str("[\"int\", \"null\"]").unwrap();
2026        assert_eq!(
2027            t,
2028            Schema::Union(vec![
2029                Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2030                Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2031            ])
2032        );
2033
2034        let t: Type = serde_json::from_str(
2035            r#"{
2036                   "type":"long",
2037                   "logicalType":"timestamp-micros"
2038                }"#,
2039        )
2040        .unwrap();
2041
2042        let timestamp = Type {
2043            r#type: TypeName::Primitive(PrimitiveType::Long),
2044            attributes: Attributes {
2045                logical_type: Some("timestamp-micros"),
2046                additional: Default::default(),
2047            },
2048        };
2049
2050        assert_eq!(t, timestamp);
2051
2052        let t: ComplexType = serde_json::from_str(
2053            r#"{
2054                   "type":"fixed",
2055                   "name":"fixed",
2056                   "namespace":"topLevelRecord.value",
2057                   "size":11,
2058                   "logicalType":"decimal",
2059                   "precision":25,
2060                   "scale":2
2061                }"#,
2062        )
2063        .unwrap();
2064
2065        let decimal = ComplexType::Fixed(Fixed {
2066            name: "fixed",
2067            namespace: Some("topLevelRecord.value"),
2068            aliases: vec![],
2069            size: 11,
2070            attributes: Attributes {
2071                logical_type: Some("decimal"),
2072                additional: vec![("precision", json!(25)), ("scale", json!(2))]
2073                    .into_iter()
2074                    .collect(),
2075            },
2076        });
2077
2078        assert_eq!(t, decimal);
2079
2080        let schema: Schema = serde_json::from_str(
2081            r#"{
2082               "type":"record",
2083               "name":"topLevelRecord",
2084               "fields":[
2085                  {
2086                     "name":"value",
2087                     "type":[
2088                        {
2089                           "type":"fixed",
2090                           "name":"fixed",
2091                           "namespace":"topLevelRecord.value",
2092                           "size":11,
2093                           "logicalType":"decimal",
2094                           "precision":25,
2095                           "scale":2
2096                        },
2097                        "null"
2098                     ]
2099                  }
2100               ]
2101            }"#,
2102        )
2103        .unwrap();
2104
2105        assert_eq!(
2106            schema,
2107            Schema::Complex(ComplexType::Record(Record {
2108                name: "topLevelRecord",
2109                namespace: None,
2110                doc: None,
2111                aliases: vec![],
2112                fields: vec![Field {
2113                    name: "value",
2114                    doc: None,
2115                    r#type: Schema::Union(vec![
2116                        Schema::Complex(decimal),
2117                        Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2118                    ]),
2119                    default: None,
2120                    aliases: vec![],
2121                },],
2122                attributes: Default::default(),
2123            }))
2124        );
2125
2126        let schema: Schema = serde_json::from_str(
2127            r#"{
2128                  "type": "record",
2129                  "name": "LongList",
2130                  "aliases": ["LinkedLongs"],
2131                  "fields" : [
2132                    {"name": "value", "type": "long"},
2133                    {"name": "next", "type": ["null", "LongList"]}
2134                  ]
2135                }"#,
2136        )
2137        .unwrap();
2138
2139        assert_eq!(
2140            schema,
2141            Schema::Complex(ComplexType::Record(Record {
2142                name: "LongList",
2143                namespace: None,
2144                doc: None,
2145                aliases: vec!["LinkedLongs"],
2146                fields: vec![
2147                    Field {
2148                        name: "value",
2149                        doc: None,
2150                        r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2151                        default: None,
2152                        aliases: vec![],
2153                    },
2154                    Field {
2155                        name: "next",
2156                        doc: None,
2157                        r#type: Schema::Union(vec![
2158                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2159                            Schema::TypeName(TypeName::Ref("LongList")),
2160                        ]),
2161                        default: None,
2162                        aliases: vec![],
2163                    }
2164                ],
2165                attributes: Attributes::default(),
2166            }))
2167        );
2168
2169        // Recursive schema are not supported
2170        let err = AvroField::try_from(&schema).unwrap_err().to_string();
2171        assert_eq!(err, "Parser error: Failed to resolve .LongList");
2172
2173        let schema: Schema = serde_json::from_str(
2174            r#"{
2175               "type":"record",
2176               "name":"topLevelRecord",
2177               "fields":[
2178                  {
2179                     "name":"id",
2180                     "type":[
2181                        "int",
2182                        "null"
2183                     ]
2184                  },
2185                  {
2186                     "name":"timestamp_col",
2187                     "type":[
2188                        {
2189                           "type":"long",
2190                           "logicalType":"timestamp-micros"
2191                        },
2192                        "null"
2193                     ]
2194                  }
2195               ]
2196            }"#,
2197        )
2198        .unwrap();
2199
2200        assert_eq!(
2201            schema,
2202            Schema::Complex(ComplexType::Record(Record {
2203                name: "topLevelRecord",
2204                namespace: None,
2205                doc: None,
2206                aliases: vec![],
2207                fields: vec![
2208                    Field {
2209                        name: "id",
2210                        doc: None,
2211                        r#type: Schema::Union(vec![
2212                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2213                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2214                        ]),
2215                        default: None,
2216                        aliases: vec![],
2217                    },
2218                    Field {
2219                        name: "timestamp_col",
2220                        doc: None,
2221                        r#type: Schema::Union(vec![
2222                            Schema::Type(timestamp),
2223                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2224                        ]),
2225                        default: None,
2226                        aliases: vec![],
2227                    }
2228                ],
2229                attributes: Default::default(),
2230            }))
2231        );
2232        let codec = AvroField::try_from(&schema).unwrap();
2233        let expected_arrow_field = arrow_schema::Field::new(
2234            "topLevelRecord",
2235            DataType::Struct(Fields::from(vec![
2236                arrow_schema::Field::new("id", DataType::Int32, true),
2237                arrow_schema::Field::new(
2238                    "timestamp_col",
2239                    DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
2240                    true,
2241                ),
2242            ])),
2243            false,
2244        )
2245        .with_metadata(std::collections::HashMap::from([(
2246            AVRO_NAME_METADATA_KEY.to_string(),
2247            "topLevelRecord".to_string(),
2248        )]));
2249
2250        assert_eq!(codec.field(), expected_arrow_field);
2251
2252        let schema: Schema = serde_json::from_str(
2253            r#"{
2254                  "type": "record",
2255                  "name": "HandshakeRequest", "namespace":"org.apache.avro.ipc",
2256                  "fields": [
2257                    {"name": "clientHash", "type": {"type": "fixed", "name": "MD5", "size": 16}},
2258                    {"name": "clientProtocol", "type": ["null", "string"]},
2259                    {"name": "serverHash", "type": "MD5"},
2260                    {"name": "meta", "type": ["null", {"type": "map", "values": "bytes"}]}
2261                  ]
2262            }"#,
2263        )
2264        .unwrap();
2265
2266        assert_eq!(
2267            schema,
2268            Schema::Complex(ComplexType::Record(Record {
2269                name: "HandshakeRequest",
2270                namespace: Some("org.apache.avro.ipc"),
2271                doc: None,
2272                aliases: vec![],
2273                fields: vec![
2274                    Field {
2275                        name: "clientHash",
2276                        doc: None,
2277                        r#type: Schema::Complex(ComplexType::Fixed(Fixed {
2278                            name: "MD5",
2279                            namespace: None,
2280                            aliases: vec![],
2281                            size: 16,
2282                            attributes: Default::default(),
2283                        })),
2284                        default: None,
2285                        aliases: vec![],
2286                    },
2287                    Field {
2288                        name: "clientProtocol",
2289                        doc: None,
2290                        r#type: Schema::Union(vec![
2291                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2292                            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2293                        ]),
2294                        default: None,
2295                        aliases: vec![],
2296                    },
2297                    Field {
2298                        name: "serverHash",
2299                        doc: None,
2300                        r#type: Schema::TypeName(TypeName::Ref("MD5")),
2301                        default: None,
2302                        aliases: vec![],
2303                    },
2304                    Field {
2305                        name: "meta",
2306                        doc: None,
2307                        r#type: Schema::Union(vec![
2308                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2309                            Schema::Complex(ComplexType::Map(Map {
2310                                values: Box::new(Schema::TypeName(TypeName::Primitive(
2311                                    PrimitiveType::Bytes
2312                                ))),
2313                                attributes: Default::default(),
2314                            })),
2315                        ]),
2316                        default: None,
2317                        aliases: vec![],
2318                    }
2319                ],
2320                attributes: Default::default(),
2321            }))
2322        );
2323    }
2324
2325    #[test]
2326    fn test_canonical_form_generation_comprehensive_record() {
2327        // NOTE: This schema is identical to the one used in test_deserialize_comprehensive.
2328        let json_str = r#"{
2329          "type": "record",
2330          "name": "E2eComprehensive",
2331          "namespace": "org.apache.arrow.avrotests.v1",
2332          "doc": "Comprehensive Avro writer schema to exercise arrow-avro Reader/Decoder paths.",
2333          "fields": [
2334            {"name": "id", "type": "long", "doc": "Primary row id", "aliases": ["identifier"]},
2335            {"name": "flag", "type": "boolean", "default": true, "doc": "A sample boolean with default true"},
2336            {"name": "ratio_f32", "type": "float", "default": 0.0, "doc": "Float32 example"},
2337            {"name": "ratio_f64", "type": "double", "default": 0.0, "doc": "Float64 example"},
2338            {"name": "count_i32", "type": "int", "default": 0, "doc": "Int32 example"},
2339            {"name": "count_i64", "type": "long", "default": 0, "doc": "Int64 example"},
2340            {"name": "opt_i32_nullfirst", "type": ["null", "int"], "default": null, "doc": "Nullable int (null-first)"},
2341            {"name": "opt_str_nullsecond", "type": ["string", "null"], "default": "", "aliases": ["old_opt_str"], "doc": "Nullable string (null-second). Default is empty string."},
2342            {"name": "tri_union_prim", "type": ["int", "string", "boolean"], "default": 0, "doc": "Union[int, string, boolean] with default on first branch (int=0)."},
2343            {"name": "str_utf8", "type": "string", "default": "default", "doc": "Plain Utf8 string (Reader may use Utf8View)."},
2344            {"name": "raw_bytes", "type": "bytes", "default": "", "doc": "Raw bytes field"},
2345            {"name": "fx16_plain", "type": {"type": "fixed", "name": "Fx16", "namespace": "org.apache.arrow.avrotests.v1.types", "aliases": ["Fixed16Old"], "size": 16}, "doc": "Plain fixed(16)"},
2346            {"name": "dec_bytes_s10_2", "type": {"type": "bytes", "logicalType": "decimal", "precision": 10, "scale": 2}, "doc": "Decimal encoded on bytes, precision 10, scale 2"},
2347            {"name": "dec_fix_s20_4", "type": {"type": "fixed", "name": "DecFix20", "namespace": "org.apache.arrow.avrotests.v1.types", "size": 20, "logicalType": "decimal", "precision": 20, "scale": 4}, "doc": "Decimal encoded on fixed(20), precision 20, scale 4"},
2348            {"name": "uuid_str", "type": {"type": "string", "logicalType": "uuid"}, "doc": "UUID logical type on string"},
2349            {"name": "d_date", "type": {"type": "int", "logicalType": "date"}, "doc": "Date32: days since 1970-01-01"},
2350            {"name": "t_millis", "type": {"type": "int", "logicalType": "time-millis"}, "doc": "Time32-millis"},
2351            {"name": "t_micros", "type": {"type": "long", "logicalType": "time-micros"}, "doc": "Time64-micros"},
2352            {"name": "ts_millis_utc", "type": {"type": "long", "logicalType": "timestamp-millis"}, "doc": "Timestamp ms (UTC)"},
2353            {"name": "ts_micros_utc", "type": {"type": "long", "logicalType": "timestamp-micros"}, "doc": "Timestamp µs (UTC)"},
2354            {"name": "ts_millis_local", "type": {"type": "long", "logicalType": "local-timestamp-millis"}, "doc": "Local timestamp ms"},
2355            {"name": "ts_micros_local", "type": {"type": "long", "logicalType": "local-timestamp-micros"}, "doc": "Local timestamp µs"},
2356            {"name": "interval_mdn", "type": {"type": "fixed", "name": "Dur12", "namespace": "org.apache.arrow.avrotests.v1.types", "size": 12, "logicalType": "duration"}, "doc": "Duration: fixed(12) little-endian (months, days, millis)"},
2357            {"name": "status", "type": {"type": "enum", "name": "Status", "namespace": "org.apache.arrow.avrotests.v1.types", "symbols": ["UNKNOWN", "NEW", "PROCESSING", "DONE"], "aliases": ["State"], "doc": "Processing status enum with default"}, "default": "UNKNOWN", "doc": "Enum field using default when resolving"},
2358            {"name": "arr_union", "type": {"type": "array", "items": ["long", "string", "null"]}, "default": [], "doc": "Array whose items are a union[long,string,null]"},
2359            {"name": "map_union", "type": {"type": "map", "values": ["null", "double", "string"]}, "default": {}, "doc": "Map whose values are a union[null,double,string]"},
2360            {"name": "address", "type": {"type": "record", "name": "Address", "namespace": "org.apache.arrow.avrotests.v1.types", "doc": "Postal address with defaults and field alias", "fields": [
2361                {"name": "street", "type": "string", "default": "", "aliases": ["street_name"], "doc": "Street (field alias = street_name)"},
2362                {"name": "zip", "type": "int", "default": 0, "doc": "ZIP/postal code"},
2363                {"name": "country", "type": "string", "default": "US", "doc": "Country code"}
2364            ]}, "doc": "Embedded Address record"},
2365            {"name": "maybe_auth", "type": {"type": "record", "name": "MaybeAuth", "namespace": "org.apache.arrow.avrotests.v1.types", "doc": "Optional auth token model", "fields": [
2366                {"name": "user", "type": "string", "doc": "Username"},
2367                {"name": "token", "type": ["null", "bytes"], "default": null, "doc": "Nullable auth token"}
2368            ]}},
2369            {"name": "union_enum_record_array_map", "type": [
2370                {"type": "enum", "name": "Color", "namespace": "org.apache.arrow.avrotests.v1.types", "symbols": ["RED", "GREEN", "BLUE"], "doc": "Color enum"},
2371                {"type": "record", "name": "RecA", "namespace": "org.apache.arrow.avrotests.v1.types", "fields": [{"name": "a", "type": "int"}, {"name": "b", "type": "string"}]},
2372                {"type": "record", "name": "RecB", "namespace": "org.apache.arrow.avrotests.v1.types", "fields": [{"name": "x", "type": "long"}, {"name": "y", "type": "bytes"}]},
2373                {"type": "array", "items": "long"},
2374                {"type": "map", "values": "string"}
2375            ], "doc": "Union of enum, two records, array, and map"},
2376            {"name": "union_date_or_fixed4", "type": [
2377                {"type": "int", "logicalType": "date"},
2378                {"type": "fixed", "name": "Fx4", "size": 4}
2379            ], "doc": "Union of date(int) or fixed(4)"},
2380            {"name": "union_interval_or_string", "type": [
2381                {"type": "fixed", "name": "Dur12U", "size": 12, "logicalType": "duration"},
2382                "string"
2383            ], "doc": "Union of duration(fixed12) or string"},
2384            {"name": "union_uuid_or_fixed10", "type": [
2385                {"type": "string", "logicalType": "uuid"},
2386                {"type": "fixed", "name": "Fx10", "size": 10}
2387            ], "doc": "Union of UUID string or fixed(10)"},
2388            {"name": "array_records_with_union", "type": {"type": "array", "items": {
2389                "type": "record", "name": "KV", "namespace": "org.apache.arrow.avrotests.v1.types",
2390                "fields": [
2391                    {"name": "key", "type": "string"},
2392                    {"name": "val", "type": ["null", "int", "long"], "default": null}
2393                ]
2394            }}, "doc": "Array<record{key, val: union[null,int,long]}>", "default": []},
2395            {"name": "union_map_or_array_int", "type": [
2396                {"type": "map", "values": "int"},
2397                {"type": "array", "items": "int"}
2398            ], "doc": "Union[map<string,int>, array<int>]"},
2399            {"name": "renamed_with_default", "type": "int", "default": 42, "aliases": ["old_count"], "doc": "Field with alias and default"},
2400            {"name": "person", "type": {"type": "record", "name": "PersonV2", "namespace": "com.example.v2", "aliases": ["com.example.Person"], "doc": "Person record with alias pointing to previous namespace/name", "fields": [
2401                {"name": "name", "type": "string"},
2402                {"name": "age", "type": "int", "default": 0}
2403            ]}, "doc": "Record using type alias for schema evolution tests"}
2404          ]
2405        }"#;
2406        let avro = AvroSchema::new(json_str.to_string());
2407        let parsed = avro.schema().expect("schema should deserialize");
2408        let expected_canonical_form = r#"{"name":"org.apache.arrow.avrotests.v1.E2eComprehensive","type":"record","fields":[{"name":"id","type":"long"},{"name":"flag","type":"boolean"},{"name":"ratio_f32","type":"float"},{"name":"ratio_f64","type":"double"},{"name":"count_i32","type":"int"},{"name":"count_i64","type":"long"},{"name":"opt_i32_nullfirst","type":["null","int"]},{"name":"opt_str_nullsecond","type":["string","null"]},{"name":"tri_union_prim","type":["int","string","boolean"]},{"name":"str_utf8","type":"string"},{"name":"raw_bytes","type":"bytes"},{"name":"fx16_plain","type":{"name":"org.apache.arrow.avrotests.v1.types.Fx16","type":"fixed","size":16}},{"name":"dec_bytes_s10_2","type":"bytes"},{"name":"dec_fix_s20_4","type":{"name":"org.apache.arrow.avrotests.v1.types.DecFix20","type":"fixed","size":20}},{"name":"uuid_str","type":"string"},{"name":"d_date","type":"int"},{"name":"t_millis","type":"int"},{"name":"t_micros","type":"long"},{"name":"ts_millis_utc","type":"long"},{"name":"ts_micros_utc","type":"long"},{"name":"ts_millis_local","type":"long"},{"name":"ts_micros_local","type":"long"},{"name":"interval_mdn","type":{"name":"org.apache.arrow.avrotests.v1.types.Dur12","type":"fixed","size":12}},{"name":"status","type":{"name":"org.apache.arrow.avrotests.v1.types.Status","type":"enum","symbols":["UNKNOWN","NEW","PROCESSING","DONE"]}},{"name":"arr_union","type":{"type":"array","items":["long","string","null"]}},{"name":"map_union","type":{"type":"map","values":["null","double","string"]}},{"name":"address","type":{"name":"org.apache.arrow.avrotests.v1.types.Address","type":"record","fields":[{"name":"street","type":"string"},{"name":"zip","type":"int"},{"name":"country","type":"string"}]}},{"name":"maybe_auth","type":{"name":"org.apache.arrow.avrotests.v1.types.MaybeAuth","type":"record","fields":[{"name":"user","type":"string"},{"name":"token","type":["null","bytes"]}]}},{"name":"union_enum_record_array_map","type":[{"name":"org.apache.arrow.avrotests.v1.types.Color","type":"enum","symbols":["RED","GREEN","BLUE"]},{"name":"org.apache.arrow.avrotests.v1.types.RecA","type":"record","fields":[{"name":"a","type":"int"},{"name":"b","type":"string"}]},{"name":"org.apache.arrow.avrotests.v1.types.RecB","type":"record","fields":[{"name":"x","type":"long"},{"name":"y","type":"bytes"}]},{"type":"array","items":"long"},{"type":"map","values":"string"}]},{"name":"union_date_or_fixed4","type":["int",{"name":"org.apache.arrow.avrotests.v1.Fx4","type":"fixed","size":4}]},{"name":"union_interval_or_string","type":[{"name":"org.apache.arrow.avrotests.v1.Dur12U","type":"fixed","size":12},"string"]},{"name":"union_uuid_or_fixed10","type":["string",{"name":"org.apache.arrow.avrotests.v1.Fx10","type":"fixed","size":10}]},{"name":"array_records_with_union","type":{"type":"array","items":{"name":"org.apache.arrow.avrotests.v1.types.KV","type":"record","fields":[{"name":"key","type":"string"},{"name":"val","type":["null","int","long"]}]}}},{"name":"union_map_or_array_int","type":[{"type":"map","values":"int"},{"type":"array","items":"int"}]},{"name":"renamed_with_default","type":"int"},{"name":"person","type":{"name":"com.example.v2.PersonV2","type":"record","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"}]}}]}"#;
2409        let canonical_form =
2410            AvroSchema::generate_canonical_form(&parsed).expect("canonical form should be built");
2411        assert_eq!(
2412            canonical_form, expected_canonical_form,
2413            "Canonical form must match Avro spec PCF exactly"
2414        );
2415    }
2416
2417    #[test]
2418    fn test_new_schema_store() {
2419        let store = SchemaStore::new();
2420        assert!(store.schemas.is_empty());
2421    }
2422
2423    #[test]
2424    fn test_try_from_schemas_rabin() {
2425        let int_avro_schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2426        let record_avro_schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2427        let mut schemas: HashMap<Fingerprint, AvroSchema> = HashMap::new();
2428        schemas.insert(
2429            int_avro_schema
2430                .fingerprint(FingerprintAlgorithm::Rabin)
2431                .unwrap(),
2432            int_avro_schema.clone(),
2433        );
2434        schemas.insert(
2435            record_avro_schema
2436                .fingerprint(FingerprintAlgorithm::Rabin)
2437                .unwrap(),
2438            record_avro_schema.clone(),
2439        );
2440        let store = SchemaStore::try_from(schemas).unwrap();
2441        let int_fp = int_avro_schema
2442            .fingerprint(FingerprintAlgorithm::Rabin)
2443            .unwrap();
2444        assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema));
2445        let rec_fp = record_avro_schema
2446            .fingerprint(FingerprintAlgorithm::Rabin)
2447            .unwrap();
2448        assert_eq!(store.lookup(&rec_fp).cloned(), Some(record_avro_schema));
2449    }
2450
2451    #[test]
2452    fn test_try_from_with_duplicates() {
2453        let int_avro_schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2454        let record_avro_schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2455        let mut schemas: HashMap<Fingerprint, AvroSchema> = HashMap::new();
2456        schemas.insert(
2457            int_avro_schema
2458                .fingerprint(FingerprintAlgorithm::Rabin)
2459                .unwrap(),
2460            int_avro_schema.clone(),
2461        );
2462        schemas.insert(
2463            record_avro_schema
2464                .fingerprint(FingerprintAlgorithm::Rabin)
2465                .unwrap(),
2466            record_avro_schema.clone(),
2467        );
2468        // Insert duplicate of int schema
2469        schemas.insert(
2470            int_avro_schema
2471                .fingerprint(FingerprintAlgorithm::Rabin)
2472                .unwrap(),
2473            int_avro_schema.clone(),
2474        );
2475        let store = SchemaStore::try_from(schemas).unwrap();
2476        assert_eq!(store.schemas.len(), 2);
2477        let int_fp = int_avro_schema
2478            .fingerprint(FingerprintAlgorithm::Rabin)
2479            .unwrap();
2480        assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema));
2481    }
2482
2483    #[test]
2484    fn test_register_and_lookup_rabin() {
2485        let mut store = SchemaStore::new();
2486        let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2487        let fp_enum = store.register(schema.clone()).unwrap();
2488        match fp_enum {
2489            Fingerprint::Rabin(fp_val) => {
2490                assert_eq!(
2491                    store.lookup(&Fingerprint::Rabin(fp_val)).cloned(),
2492                    Some(schema.clone())
2493                );
2494                assert!(
2495                    store
2496                        .lookup(&Fingerprint::Rabin(fp_val.wrapping_add(1)))
2497                        .is_none()
2498                );
2499            }
2500            Fingerprint::Id(_id) => {
2501                unreachable!("This test should only generate Rabin fingerprints")
2502            }
2503            Fingerprint::Id64(_id) => {
2504                unreachable!("This test should only generate Rabin fingerprints")
2505            }
2506            #[cfg(feature = "md5")]
2507            Fingerprint::MD5(_id) => {
2508                unreachable!("This test should only generate Rabin fingerprints")
2509            }
2510            #[cfg(feature = "sha256")]
2511            Fingerprint::SHA256(_id) => {
2512                unreachable!("This test should only generate Rabin fingerprints")
2513            }
2514        }
2515    }
2516
2517    #[test]
2518    fn test_set_and_lookup_id() {
2519        let mut store = SchemaStore::new();
2520        let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2521        let id = 42u32;
2522        let fp = Fingerprint::Id(id);
2523        let out_fp = store.set(fp, schema.clone()).unwrap();
2524        assert_eq!(out_fp, fp);
2525        assert_eq!(store.lookup(&fp).cloned(), Some(schema.clone()));
2526        assert!(store.lookup(&Fingerprint::Id(id.wrapping_add(1))).is_none());
2527    }
2528
2529    #[test]
2530    fn test_set_and_lookup_id64() {
2531        let mut store = SchemaStore::new();
2532        let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2533        let id64: u64 = 0xDEAD_BEEF_DEAD_BEEF;
2534        let fp = Fingerprint::Id64(id64);
2535        let out_fp = store.set(fp, schema.clone()).unwrap();
2536        assert_eq!(out_fp, fp, "set should return the same Id64 fingerprint");
2537        assert_eq!(
2538            store.lookup(&fp).cloned(),
2539            Some(schema.clone()),
2540            "lookup should find the schema by Id64"
2541        );
2542        assert!(
2543            store
2544                .lookup(&Fingerprint::Id64(id64.wrapping_add(1)))
2545                .is_none(),
2546            "lookup with a different Id64 must return None"
2547        );
2548    }
2549
2550    #[test]
2551    fn test_fingerprint_id64_conversions() {
2552        let algo_from_fp = FingerprintAlgorithm::from(&Fingerprint::Id64(123));
2553        assert_eq!(algo_from_fp, FingerprintAlgorithm::Id64);
2554        let fp_from_algo = Fingerprint::from(FingerprintAlgorithm::Id64);
2555        assert!(matches!(fp_from_algo, Fingerprint::Id64(0)));
2556        let strategy_from_fp = FingerprintStrategy::from(Fingerprint::Id64(5));
2557        assert!(matches!(strategy_from_fp, FingerprintStrategy::Id64(0)));
2558        let algo_from_strategy = FingerprintAlgorithm::from(strategy_from_fp);
2559        assert_eq!(algo_from_strategy, FingerprintAlgorithm::Id64);
2560    }
2561
2562    #[test]
2563    fn test_register_duplicate_schema() {
2564        let mut store = SchemaStore::new();
2565        let schema1 = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2566        let schema2 = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2567        let fingerprint1 = store.register(schema1).unwrap();
2568        let fingerprint2 = store.register(schema2).unwrap();
2569        assert_eq!(fingerprint1, fingerprint2);
2570        assert_eq!(store.schemas.len(), 1);
2571    }
2572
2573    #[test]
2574    fn test_set_and_lookup_with_provided_fingerprint() {
2575        let mut store = SchemaStore::new();
2576        let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2577        let fp = schema.fingerprint(FingerprintAlgorithm::Rabin).unwrap();
2578        let out_fp = store.set(fp, schema.clone()).unwrap();
2579        assert_eq!(out_fp, fp);
2580        assert_eq!(store.lookup(&fp).cloned(), Some(schema));
2581    }
2582
2583    #[test]
2584    fn test_set_duplicate_same_schema_ok() {
2585        let mut store = SchemaStore::new();
2586        let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2587        let fp = schema.fingerprint(FingerprintAlgorithm::Rabin).unwrap();
2588        let _ = store.set(fp, schema.clone()).unwrap();
2589        let _ = store.set(fp, schema.clone()).unwrap();
2590        assert_eq!(store.schemas.len(), 1);
2591    }
2592
2593    #[test]
2594    fn test_set_duplicate_different_schema_collision_error() {
2595        let mut store = SchemaStore::new();
2596        let schema1 = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2597        let schema2 = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2598        // Use the same Fingerprint::Id to simulate a collision across different schemas
2599        let fp = Fingerprint::Id(123);
2600        let _ = store.set(fp, schema1).unwrap();
2601        let err = store.set(fp, schema2).unwrap_err();
2602        let msg = format!("{err}");
2603        assert!(msg.contains("Schema fingerprint collision"));
2604    }
2605
2606    #[test]
2607    fn test_canonical_form_generation_primitive() {
2608        let schema = int_schema();
2609        let canonical_form = AvroSchema::generate_canonical_form(&schema).unwrap();
2610        assert_eq!(canonical_form, r#""int""#);
2611    }
2612
2613    #[test]
2614    fn test_canonical_form_generation_record() {
2615        let schema = record_schema();
2616        let expected_canonical_form = r#"{"name":"test.namespace.record1","type":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"string"}]}"#;
2617        let canonical_form = AvroSchema::generate_canonical_form(&schema).unwrap();
2618        assert_eq!(canonical_form, expected_canonical_form);
2619    }
2620
2621    #[test]
2622    fn test_fingerprint_calculation() {
2623        let canonical_form = r#"{"fields":[{"name":"a","type":"long"},{"name":"b","type":"string"}],"name":"test","type":"record"}"#;
2624        let expected_fingerprint = 10505236152925314060;
2625        let fingerprint = compute_fingerprint_rabin(canonical_form);
2626        assert_eq!(fingerprint, expected_fingerprint);
2627    }
2628
2629    #[test]
2630    fn test_register_and_lookup_complex_schema() {
2631        let mut store = SchemaStore::new();
2632        let schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2633        let canonical_form = r#"{"name":"test.namespace.record1","type":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"string"}]}"#;
2634        let expected_fingerprint = Fingerprint::Rabin(compute_fingerprint_rabin(canonical_form));
2635        let fingerprint = store.register(schema.clone()).unwrap();
2636        assert_eq!(fingerprint, expected_fingerprint);
2637        let looked_up = store.lookup(&fingerprint).cloned();
2638        assert_eq!(looked_up, Some(schema));
2639    }
2640
2641    #[test]
2642    fn test_fingerprints_returns_all_keys() {
2643        let mut store = SchemaStore::new();
2644        let fp_int = store
2645            .register(AvroSchema::new(
2646                serde_json::to_string(&int_schema()).unwrap(),
2647            ))
2648            .unwrap();
2649        let fp_record = store
2650            .register(AvroSchema::new(
2651                serde_json::to_string(&record_schema()).unwrap(),
2652            ))
2653            .unwrap();
2654        let fps = store.fingerprints();
2655        assert_eq!(fps.len(), 2);
2656        assert!(fps.contains(&fp_int));
2657        assert!(fps.contains(&fp_record));
2658    }
2659
2660    #[test]
2661    fn test_canonical_form_strips_attributes() {
2662        let schema_with_attrs = Schema::Complex(ComplexType::Record(Record {
2663            name: "record_with_attrs",
2664            namespace: None,
2665            doc: Some(Cow::from("This doc should be stripped")),
2666            aliases: vec!["alias1", "alias2"],
2667            fields: vec![Field {
2668                name: "f1",
2669                doc: Some(Cow::from("field doc")),
2670                r#type: Schema::Type(Type {
2671                    r#type: TypeName::Primitive(PrimitiveType::Bytes),
2672                    attributes: Attributes {
2673                        logical_type: None,
2674                        additional: HashMap::from([("precision", json!(4))]),
2675                    },
2676                }),
2677                default: None,
2678                aliases: vec![],
2679            }],
2680            attributes: Attributes {
2681                logical_type: None,
2682                additional: HashMap::from([("custom_attr", json!("value"))]),
2683            },
2684        }));
2685        let expected_canonical_form = r#"{"name":"record_with_attrs","type":"record","fields":[{"name":"f1","type":"bytes"}]}"#;
2686        let canonical_form = AvroSchema::generate_canonical_form(&schema_with_attrs).unwrap();
2687        assert_eq!(canonical_form, expected_canonical_form);
2688    }
2689
2690    #[cfg(not(feature = "avro_custom_types"))]
2691    #[test]
2692    fn test_primitive_mappings() {
2693        let cases = vec![
2694            (DataType::Boolean, "\"boolean\""),
2695            (DataType::Int8, "\"int\""),
2696            (DataType::Int16, "\"int\""),
2697            (DataType::Int32, "\"int\""),
2698            (DataType::Int64, "\"long\""),
2699            (DataType::UInt8, "\"int\""),
2700            (DataType::UInt16, "\"int\""),
2701            (DataType::UInt32, "\"long\""),
2702            (DataType::UInt64, "\"long\""),
2703            (DataType::Float16, "\"float\""),
2704            (DataType::Float32, "\"float\""),
2705            (DataType::Float64, "\"double\""),
2706            (DataType::Utf8, "\"string\""),
2707            (DataType::Binary, "\"bytes\""),
2708        ];
2709        for (dt, avro_token) in cases {
2710            let field = ArrowField::new("col", dt.clone(), false);
2711            let arrow_schema = single_field_schema(field);
2712            let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2713            assert_json_contains(&avro.json_string, avro_token);
2714        }
2715    }
2716
2717    #[cfg(feature = "avro_custom_types")]
2718    #[test]
2719    fn test_primitive_mappings() {
2720        let cases = vec![
2721            (DataType::Boolean, "\"boolean\""),
2722            (DataType::Int8, "\"logicalType\":\"arrow.int8\""),
2723            (DataType::Int16, "\"logicalType\":\"arrow.int16\""),
2724            (DataType::Int32, "\"int\""),
2725            (DataType::Int64, "\"long\""),
2726            (DataType::UInt8, "\"logicalType\":\"arrow.uint8\""),
2727            (DataType::UInt16, "\"logicalType\":\"arrow.uint16\""),
2728            (DataType::UInt32, "\"logicalType\":\"arrow.uint32\""),
2729            (DataType::UInt64, "\"logicalType\":\"arrow.uint64\""),
2730            (DataType::Float16, "\"logicalType\":\"arrow.float16\""),
2731            (DataType::Float32, "\"float\""),
2732            (DataType::Float64, "\"double\""),
2733            (DataType::Utf8, "\"string\""),
2734            (DataType::Binary, "\"bytes\""),
2735        ];
2736        for (dt, avro_token) in cases {
2737            let field = ArrowField::new("col", dt.clone(), false);
2738            let arrow_schema = single_field_schema(field);
2739            let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2740            assert_json_contains(&avro.json_string, avro_token);
2741        }
2742    }
2743
2744    #[cfg(feature = "avro_custom_types")]
2745    #[test]
2746    fn test_custom_fixed_logical_types_preserve_namespace_metadata() {
2747        let namespace = "com.example.types";
2748
2749        let mut md_u64 = HashMap::new();
2750        md_u64.insert(AVRO_NAME_METADATA_KEY.to_string(), "U64Type".to_string());
2751        md_u64.insert(
2752            AVRO_NAMESPACE_METADATA_KEY.to_string(),
2753            namespace.to_string(),
2754        );
2755
2756        let mut md_f16 = HashMap::new();
2757        md_f16.insert(AVRO_NAME_METADATA_KEY.to_string(), "F16Type".to_string());
2758        md_f16.insert(
2759            AVRO_NAMESPACE_METADATA_KEY.to_string(),
2760            namespace.to_string(),
2761        );
2762
2763        let mut md_iv_ym = HashMap::new();
2764        md_iv_ym.insert(AVRO_NAME_METADATA_KEY.to_string(), "IvYmType".to_string());
2765        md_iv_ym.insert(
2766            AVRO_NAMESPACE_METADATA_KEY.to_string(),
2767            namespace.to_string(),
2768        );
2769
2770        let mut md_iv_dt = HashMap::new();
2771        md_iv_dt.insert(AVRO_NAME_METADATA_KEY.to_string(), "IvDtType".to_string());
2772        md_iv_dt.insert(
2773            AVRO_NAMESPACE_METADATA_KEY.to_string(),
2774            namespace.to_string(),
2775        );
2776
2777        let arrow_schema = ArrowSchema::new(vec![
2778            ArrowField::new("u64_col", DataType::UInt64, false).with_metadata(md_u64),
2779            ArrowField::new("f16_col", DataType::Float16, false).with_metadata(md_f16),
2780            ArrowField::new(
2781                "iv_ym_col",
2782                DataType::Interval(IntervalUnit::YearMonth),
2783                false,
2784            )
2785            .with_metadata(md_iv_ym),
2786            ArrowField::new(
2787                "iv_dt_col",
2788                DataType::Interval(IntervalUnit::DayTime),
2789                false,
2790            )
2791            .with_metadata(md_iv_dt),
2792        ]);
2793
2794        let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2795        let root: Value = serde_json::from_str(&avro.json_string).unwrap();
2796        let fields = root
2797            .get("fields")
2798            .and_then(|f| f.as_array())
2799            .expect("record fields array");
2800
2801        let expected = [
2802            ("u64_col", "arrow.uint64"),
2803            ("f16_col", "arrow.float16"),
2804            ("iv_ym_col", "arrow.interval-year-month"),
2805            ("iv_dt_col", "arrow.interval-day-time"),
2806        ];
2807
2808        for (field_name, logical_type) in expected {
2809            let field = fields
2810                .iter()
2811                .find(|f| f.get("name").and_then(Value::as_str) == Some(field_name))
2812                .unwrap_or_else(|| panic!("missing field {field_name}"));
2813            let ty = field
2814                .get("type")
2815                .and_then(Value::as_object)
2816                .unwrap_or_else(|| panic!("field {field_name} type must be object"));
2817
2818            assert_eq!(ty.get("type").and_then(Value::as_str), Some("fixed"));
2819            assert_eq!(
2820                ty.get("logicalType").and_then(Value::as_str),
2821                Some(logical_type)
2822            );
2823            assert_eq!(
2824                ty.get("namespace").and_then(Value::as_str),
2825                Some(namespace),
2826                "field {field_name} must preserve avro.namespace metadata"
2827            );
2828        }
2829    }
2830
2831    #[cfg(feature = "avro_custom_types")]
2832    #[test]
2833    fn test_custom_fixed_logical_types_omit_namespace_without_metadata() {
2834        let mut md_u64 = HashMap::new();
2835        md_u64.insert(AVRO_NAME_METADATA_KEY.to_string(), "U64Type".to_string());
2836
2837        let mut md_f16 = HashMap::new();
2838        md_f16.insert(AVRO_NAME_METADATA_KEY.to_string(), "F16Type".to_string());
2839
2840        let mut md_iv_ym = HashMap::new();
2841        md_iv_ym.insert(AVRO_NAME_METADATA_KEY.to_string(), "IvYmType".to_string());
2842
2843        let mut md_iv_dt = HashMap::new();
2844        md_iv_dt.insert(AVRO_NAME_METADATA_KEY.to_string(), "IvDtType".to_string());
2845
2846        let arrow_schema = ArrowSchema::new(vec![
2847            ArrowField::new("u64_col", DataType::UInt64, false).with_metadata(md_u64),
2848            ArrowField::new("f16_col", DataType::Float16, false).with_metadata(md_f16),
2849            ArrowField::new(
2850                "iv_ym_col",
2851                DataType::Interval(IntervalUnit::YearMonth),
2852                false,
2853            )
2854            .with_metadata(md_iv_ym),
2855            ArrowField::new(
2856                "iv_dt_col",
2857                DataType::Interval(IntervalUnit::DayTime),
2858                false,
2859            )
2860            .with_metadata(md_iv_dt),
2861        ]);
2862
2863        let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2864        let root: Value = serde_json::from_str(&avro.json_string).unwrap();
2865        let fields = root
2866            .get("fields")
2867            .and_then(|f| f.as_array())
2868            .expect("record fields array");
2869
2870        for field_name in ["u64_col", "f16_col", "iv_ym_col", "iv_dt_col"] {
2871            let field = fields
2872                .iter()
2873                .find(|f| f.get("name").and_then(Value::as_str) == Some(field_name))
2874                .unwrap_or_else(|| panic!("missing field {field_name}"));
2875            let ty = field
2876                .get("type")
2877                .and_then(Value::as_object)
2878                .unwrap_or_else(|| panic!("field {field_name} type must be object"));
2879
2880            assert_eq!(ty.get("type").and_then(Value::as_str), Some("fixed"));
2881            assert!(
2882                !ty.contains_key("namespace"),
2883                "field {field_name} should not include namespace when metadata lacks avro.namespace"
2884            );
2885        }
2886    }
2887
2888    #[test]
2889    fn test_temporal_mappings() {
2890        let cases = vec![
2891            (DataType::Date32, "\"logicalType\":\"date\""),
2892            (
2893                DataType::Time32(TimeUnit::Millisecond),
2894                "\"logicalType\":\"time-millis\"",
2895            ),
2896            (
2897                DataType::Time64(TimeUnit::Microsecond),
2898                "\"logicalType\":\"time-micros\"",
2899            ),
2900            (
2901                DataType::Timestamp(TimeUnit::Millisecond, None),
2902                "\"logicalType\":\"local-timestamp-millis\"",
2903            ),
2904            (
2905                DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
2906                "\"logicalType\":\"timestamp-micros\"",
2907            ),
2908        ];
2909        for (dt, needle) in cases {
2910            let field = ArrowField::new("ts", dt.clone(), true);
2911            let arrow_schema = single_field_schema(field);
2912            let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2913            assert_json_contains(&avro.json_string, needle);
2914        }
2915    }
2916
2917    #[test]
2918    fn test_decimal_and_uuid() {
2919        let decimal_field = ArrowField::new("amount", DataType::Decimal128(25, 2), false);
2920        let dec_schema = single_field_schema(decimal_field);
2921        let avro_dec = AvroSchema::try_from(&dec_schema).unwrap();
2922        assert_json_contains(&avro_dec.json_string, "\"logicalType\":\"decimal\"");
2923        assert_json_contains(&avro_dec.json_string, "\"precision\":25");
2924        assert_json_contains(&avro_dec.json_string, "\"scale\":2");
2925        let mut md = HashMap::new();
2926        md.insert("logicalType".into(), "uuid".into());
2927        let uuid_field =
2928            ArrowField::new("id", DataType::FixedSizeBinary(16), false).with_metadata(md);
2929        let uuid_schema = single_field_schema(uuid_field);
2930        let avro_uuid = AvroSchema::try_from(&uuid_schema).unwrap();
2931        assert_json_contains(&avro_uuid.json_string, "\"logicalType\":\"uuid\"");
2932    }
2933
2934    #[cfg(not(feature = "avro_custom_types"))]
2935    #[test]
2936    fn test_interval_month_day_nano_duration_schema() {
2937        let interval_field = ArrowField::new(
2938            "span",
2939            DataType::Interval(IntervalUnit::MonthDayNano),
2940            false,
2941        );
2942        let s = single_field_schema(interval_field);
2943        let avro = AvroSchema::try_from(&s).unwrap();
2944        assert_json_contains(&avro.json_string, "\"logicalType\":\"duration\"");
2945        assert_json_contains(&avro.json_string, "\"size\":12");
2946    }
2947
2948    #[cfg(feature = "avro_custom_types")]
2949    #[test]
2950    fn test_interval_month_day_nano_custom_schema() {
2951        let interval_field = ArrowField::new(
2952            "span",
2953            DataType::Interval(IntervalUnit::MonthDayNano),
2954            false,
2955        );
2956        let s = single_field_schema(interval_field);
2957        let avro = AvroSchema::try_from(&s).unwrap();
2958        assert_json_contains(
2959            &avro.json_string,
2960            "\"logicalType\":\"arrow.interval-month-day-nano\"",
2961        );
2962        assert_json_contains(&avro.json_string, "\"size\":16");
2963    }
2964
2965    #[cfg(feature = "avro_custom_types")]
2966    #[test]
2967    fn test_duration_custom_logical_type() {
2968        let dur_field = ArrowField::new("latency", DataType::Duration(TimeUnit::Nanosecond), false);
2969        let s2 = single_field_schema(dur_field);
2970        let avro2 = AvroSchema::try_from(&s2).unwrap();
2971        assert_json_contains(
2972            &avro2.json_string,
2973            "\"logicalType\":\"arrow.duration-nanos\"",
2974        );
2975    }
2976
2977    #[test]
2978    fn test_complex_types() {
2979        let list_dt = DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true)));
2980        let list_schema = single_field_schema(ArrowField::new("numbers", list_dt, false));
2981        let avro_list = AvroSchema::try_from(&list_schema).unwrap();
2982        assert_json_contains(&avro_list.json_string, "\"type\":\"array\"");
2983        assert_json_contains(&avro_list.json_string, "\"items\"");
2984        let value_field = ArrowField::new("value", DataType::Boolean, true);
2985        let entries_struct = ArrowField::new(
2986            "entries",
2987            DataType::Struct(Fields::from(vec![
2988                ArrowField::new("key", DataType::Utf8, false),
2989                value_field.clone(),
2990            ])),
2991            false,
2992        );
2993        let map_dt = DataType::Map(Arc::new(entries_struct), false);
2994        let map_schema = single_field_schema(ArrowField::new("props", map_dt, false));
2995        let avro_map = AvroSchema::try_from(&map_schema).unwrap();
2996        assert_json_contains(&avro_map.json_string, "\"type\":\"map\"");
2997        assert_json_contains(&avro_map.json_string, "\"values\"");
2998        let struct_dt = DataType::Struct(Fields::from(vec![
2999            ArrowField::new("f1", DataType::Int64, false),
3000            ArrowField::new("f2", DataType::Utf8, true),
3001        ]));
3002        let struct_schema = single_field_schema(ArrowField::new("person", struct_dt, true));
3003        let avro_struct = AvroSchema::try_from(&struct_schema).unwrap();
3004        assert_json_contains(&avro_struct.json_string, "\"type\":\"record\"");
3005        assert_json_contains(&avro_struct.json_string, "\"null\"");
3006    }
3007
3008    #[test]
3009    fn test_enum_dictionary() {
3010        let mut md = HashMap::new();
3011        md.insert(
3012            AVRO_ENUM_SYMBOLS_METADATA_KEY.into(),
3013            "[\"OPEN\",\"CLOSED\"]".into(),
3014        );
3015        let enum_dt = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
3016        let field = ArrowField::new("status", enum_dt, false).with_metadata(md);
3017        let schema = single_field_schema(field);
3018        let avro = AvroSchema::try_from(&schema).unwrap();
3019        assert_json_contains(&avro.json_string, "\"type\":\"enum\"");
3020        assert_json_contains(&avro.json_string, "\"symbols\":[\"OPEN\",\"CLOSED\"]");
3021    }
3022
3023    #[test]
3024    fn test_run_end_encoded() {
3025        let ree_dt = DataType::RunEndEncoded(
3026            Arc::new(ArrowField::new("run_ends", DataType::Int32, false)),
3027            Arc::new(ArrowField::new("values", DataType::Utf8, false)),
3028        );
3029        let s = single_field_schema(ArrowField::new("text", ree_dt, false));
3030        let avro = AvroSchema::try_from(&s).unwrap();
3031        assert_json_contains(&avro.json_string, "\"string\"");
3032    }
3033
3034    #[test]
3035    fn test_dense_union() {
3036        let uf: UnionFields = vec![
3037            (2i8, Arc::new(ArrowField::new("a", DataType::Int32, false))),
3038            (7i8, Arc::new(ArrowField::new("b", DataType::Utf8, true))),
3039        ]
3040        .into_iter()
3041        .collect();
3042        let union_dt = DataType::Union(uf, UnionMode::Dense);
3043        let s = single_field_schema(ArrowField::new("u", union_dt, false));
3044        let avro =
3045            AvroSchema::try_from(&s).expect("Arrow Union -> Avro union conversion should succeed");
3046        let v: serde_json::Value = serde_json::from_str(&avro.json_string).unwrap();
3047        let fields = v
3048            .get("fields")
3049            .and_then(|x| x.as_array())
3050            .expect("fields array");
3051        let u_field = fields
3052            .iter()
3053            .find(|f| f.get("name").and_then(|n| n.as_str()) == Some("u"))
3054            .expect("field 'u'");
3055        let union = u_field.get("type").expect("u.type");
3056        let arr = union.as_array().expect("u.type must be Avro union array");
3057        assert_eq!(arr.len(), 2, "expected two union branches");
3058        let first = &arr[0];
3059        let obj = first
3060            .as_object()
3061            .expect("first branch should be an object with metadata");
3062        assert_eq!(obj.get("type").and_then(|t| t.as_str()), Some("int"));
3063        assert_eq!(
3064            obj.get("arrowUnionMode").and_then(|m| m.as_str()),
3065            Some("dense")
3066        );
3067        let type_ids: Vec<i64> = obj
3068            .get("arrowUnionTypeIds")
3069            .and_then(|a| a.as_array())
3070            .expect("arrowUnionTypeIds array")
3071            .iter()
3072            .map(|n| n.as_i64().expect("i64"))
3073            .collect();
3074        assert_eq!(type_ids, vec![2, 7], "type id ordering should be preserved");
3075        assert_eq!(arr[1], Value::String("string".into()));
3076    }
3077
3078    #[test]
3079    fn round_trip_primitive() {
3080        let arrow_schema = ArrowSchema::new(vec![ArrowField::new("f1", DataType::Int32, false)]);
3081        let avro_schema = AvroSchema::try_from(&arrow_schema).unwrap();
3082        let decoded = avro_schema.schema().unwrap();
3083        assert!(matches!(decoded, Schema::Complex(_)));
3084    }
3085
3086    #[test]
3087    fn test_name_generator_sanitization_and_uniqueness() {
3088        let f1 = ArrowField::new("weird-name", DataType::FixedSizeBinary(8), false);
3089        let f2 = ArrowField::new("weird name", DataType::FixedSizeBinary(8), false);
3090        let f3 = ArrowField::new("123bad", DataType::FixedSizeBinary(8), false);
3091        let arrow_schema = ArrowSchema::new(vec![f1, f2, f3]);
3092        let avro = AvroSchema::try_from(&arrow_schema).unwrap();
3093        assert_json_contains(&avro.json_string, "\"name\":\"weird_name\"");
3094        assert_json_contains(&avro.json_string, "\"name\":\"weird_name_1\"");
3095        assert_json_contains(&avro.json_string, "\"name\":\"_123bad\"");
3096    }
3097
3098    #[cfg(not(feature = "avro_custom_types"))]
3099    #[test]
3100    fn test_date64_logical_type_mapping() {
3101        let field = ArrowField::new("d", DataType::Date64, true);
3102        let schema = single_field_schema(field);
3103        let avro = AvroSchema::try_from(&schema).unwrap();
3104        assert_json_contains(
3105            &avro.json_string,
3106            "\"logicalType\":\"local-timestamp-millis\"",
3107        );
3108    }
3109
3110    #[cfg(feature = "avro_custom_types")]
3111    #[test]
3112    fn test_date64_logical_type_mapping_custom() {
3113        let field = ArrowField::new("d", DataType::Date64, true);
3114        let schema = single_field_schema(field);
3115        let avro = AvroSchema::try_from(&schema).unwrap();
3116        assert_json_contains(&avro.json_string, "\"logicalType\":\"arrow.date64\"");
3117    }
3118
3119    #[cfg(feature = "avro_custom_types")]
3120    #[test]
3121    fn test_duration_list_extras_propagated() {
3122        let child = ArrowField::new("lat", DataType::Duration(TimeUnit::Microsecond), false);
3123        let list_dt = DataType::List(Arc::new(child));
3124        let arrow_schema = single_field_schema(ArrowField::new("durations", list_dt, false));
3125        let avro = AvroSchema::try_from(&arrow_schema).unwrap();
3126        assert_json_contains(
3127            &avro.json_string,
3128            "\"logicalType\":\"arrow.duration-micros\"",
3129        );
3130    }
3131
3132    #[cfg(not(feature = "avro_custom_types"))]
3133    #[test]
3134    fn test_interval_yearmonth_extra() {
3135        let field = ArrowField::new("iv", DataType::Interval(IntervalUnit::YearMonth), false);
3136        let schema = single_field_schema(field);
3137        let avro = AvroSchema::try_from(&schema).unwrap();
3138        assert_json_contains(&avro.json_string, "\"arrowIntervalUnit\":\"yearmonth\"");
3139    }
3140
3141    #[cfg(not(feature = "avro_custom_types"))]
3142    #[test]
3143    fn test_interval_daytime_extra() {
3144        let field = ArrowField::new("iv_dt", DataType::Interval(IntervalUnit::DayTime), false);
3145        let schema = single_field_schema(field);
3146        let avro = AvroSchema::try_from(&schema).unwrap();
3147        assert_json_contains(&avro.json_string, "\"arrowIntervalUnit\":\"daytime\"");
3148    }
3149
3150    #[cfg(feature = "avro_custom_types")]
3151    #[test]
3152    fn test_interval_yearmonth_custom() {
3153        let field = ArrowField::new("iv", DataType::Interval(IntervalUnit::YearMonth), false);
3154        let schema = single_field_schema(field);
3155        let avro = AvroSchema::try_from(&schema).unwrap();
3156        assert_json_contains(
3157            &avro.json_string,
3158            "\"logicalType\":\"arrow.interval-year-month\"",
3159        );
3160    }
3161
3162    #[cfg(feature = "avro_custom_types")]
3163    #[test]
3164    fn test_interval_daytime_custom() {
3165        let field = ArrowField::new("iv_dt", DataType::Interval(IntervalUnit::DayTime), false);
3166        let schema = single_field_schema(field);
3167        let avro = AvroSchema::try_from(&schema).unwrap();
3168        assert_json_contains(
3169            &avro.json_string,
3170            "\"logicalType\":\"arrow.interval-day-time\"",
3171        );
3172    }
3173
3174    #[test]
3175    fn test_fixed_size_list_extra() {
3176        let child = ArrowField::new("item", DataType::Int32, false);
3177        let dt = DataType::FixedSizeList(Arc::new(child), 3);
3178        let schema = single_field_schema(ArrowField::new("triples", dt, false));
3179        let avro = AvroSchema::try_from(&schema).unwrap();
3180        assert_json_contains(&avro.json_string, "\"arrowFixedSize\":3");
3181    }
3182
3183    #[cfg(feature = "avro_custom_types")]
3184    #[test]
3185    fn test_map_duration_value_extra() {
3186        let val_field = ArrowField::new("value", DataType::Duration(TimeUnit::Second), true);
3187        let entries_struct = ArrowField::new(
3188            "entries",
3189            DataType::Struct(Fields::from(vec![
3190                ArrowField::new("key", DataType::Utf8, false),
3191                val_field,
3192            ])),
3193            false,
3194        );
3195        let map_dt = DataType::Map(Arc::new(entries_struct), false);
3196        let schema = single_field_schema(ArrowField::new("metrics", map_dt, false));
3197        let avro = AvroSchema::try_from(&schema).unwrap();
3198        assert_json_contains(
3199            &avro.json_string,
3200            "\"logicalType\":\"arrow.duration-seconds\"",
3201        );
3202    }
3203
3204    #[test]
3205    fn test_schema_with_non_string_defaults_decodes_successfully() {
3206        let schema_json = r#"{
3207            "type": "record",
3208            "name": "R",
3209            "fields": [
3210                {"name": "a", "type": "int", "default": 0},
3211                {"name": "b", "type": {"type": "array", "items": "long"}, "default": [1, 2, 3]},
3212                {"name": "c", "type": {"type": "map", "values": "double"}, "default": {"x": 1.5, "y": 2.5}},
3213                {"name": "inner", "type": {"type": "record", "name": "Inner", "fields": [
3214                    {"name": "flag", "type": "boolean", "default": true},
3215                    {"name": "name", "type": "string", "default": "hi"}
3216                ]}, "default": {"flag": false, "name": "d"}},
3217                {"name": "u", "type": ["int", "null"], "default": 42}
3218            ]
3219        }"#;
3220        let schema: Schema = serde_json::from_str(schema_json).expect("schema should parse");
3221        match &schema {
3222            Schema::Complex(ComplexType::Record(_)) => {}
3223            other => panic!("expected record schema, got: {:?}", other),
3224        }
3225        // Avro to Arrow conversion
3226        let field = crate::codec::AvroField::try_from(&schema)
3227            .expect("Avro->Arrow conversion should succeed");
3228        let arrow_field = field.field();
3229        // Build expected Arrow field
3230        let expected_list_item = ArrowField::new(
3231            arrow_schema::Field::LIST_FIELD_DEFAULT_NAME,
3232            DataType::Int64,
3233            false,
3234        );
3235        let expected_b = ArrowField::new("b", DataType::List(Arc::new(expected_list_item)), false);
3236
3237        let expected_map_value = ArrowField::new("value", DataType::Float64, false);
3238        let expected_entries = ArrowField::new(
3239            "entries",
3240            DataType::Struct(Fields::from(vec![
3241                ArrowField::new("key", DataType::Utf8, false),
3242                expected_map_value,
3243            ])),
3244            false,
3245        );
3246        let expected_c =
3247            ArrowField::new("c", DataType::Map(Arc::new(expected_entries), false), false);
3248        let mut inner_md = std::collections::HashMap::new();
3249        inner_md.insert(AVRO_NAME_METADATA_KEY.to_string(), "Inner".to_string());
3250        let expected_inner = ArrowField::new(
3251            "inner",
3252            DataType::Struct(Fields::from(vec![
3253                ArrowField::new("flag", DataType::Boolean, false),
3254                ArrowField::new("name", DataType::Utf8, false),
3255            ])),
3256            false,
3257        )
3258        .with_metadata(inner_md);
3259        let mut root_md = std::collections::HashMap::new();
3260        root_md.insert(AVRO_NAME_METADATA_KEY.to_string(), "R".to_string());
3261        let expected = ArrowField::new(
3262            "R",
3263            DataType::Struct(Fields::from(vec![
3264                ArrowField::new("a", DataType::Int32, false),
3265                expected_b,
3266                expected_c,
3267                expected_inner,
3268                ArrowField::new("u", DataType::Int32, true),
3269            ])),
3270            false,
3271        )
3272        .with_metadata(root_md);
3273        assert_eq!(arrow_field, expected);
3274    }
3275
3276    #[test]
3277    fn default_order_is_consistent() {
3278        let arrow_schema = ArrowSchema::new(vec![ArrowField::new("s", DataType::Utf8, true)]);
3279        let a = AvroSchema::try_from(&arrow_schema).unwrap().json_string;
3280        let b = AvroSchema::from_arrow_with_options(&arrow_schema, None);
3281        assert_eq!(a, b.unwrap().json_string);
3282    }
3283
3284    #[test]
3285    fn test_union_branch_missing_name_errors() {
3286        for t in ["record", "enum", "fixed"] {
3287            let branch = json!({ "type": t });
3288            let err = union_branch_signature(&branch).unwrap_err().to_string();
3289            assert!(
3290                err.contains(&format!("Union branch '{t}' missing required 'name'")),
3291                "expected missing-name error for {t}, got: {err}"
3292            );
3293        }
3294    }
3295
3296    #[test]
3297    fn test_union_branch_named_type_signature_includes_name() {
3298        let rec = json!({ "type": "record", "name": "Foo" });
3299        assert_eq!(union_branch_signature(&rec).unwrap(), "N:record:Foo");
3300        let en = json!({ "type": "enum", "name": "Color", "symbols": ["R", "G", "B"] });
3301        assert_eq!(union_branch_signature(&en).unwrap(), "N:enum:Color");
3302        let fx = json!({ "type": "fixed", "name": "Bytes16", "size": 16 });
3303        assert_eq!(union_branch_signature(&fx).unwrap(), "N:fixed:Bytes16");
3304    }
3305
3306    #[test]
3307    fn test_record_field_alias_resolution_without_default() {
3308        let writer_json = r#"{
3309          "type":"record",
3310          "name":"R",
3311          "fields":[{"name":"old","type":"int"}]
3312        }"#;
3313        let reader_json = r#"{
3314          "type":"record",
3315          "name":"R",
3316          "fields":[{"name":"new","aliases":["old"],"type":"int"}]
3317        }"#;
3318        let writer: Schema = serde_json::from_str(writer_json).unwrap();
3319        let reader: Schema = serde_json::from_str(reader_json).unwrap();
3320        let resolved = AvroFieldBuilder::new(&writer)
3321            .with_reader_schema(&reader)
3322            .with_utf8view(false)
3323            .with_strict_mode(false)
3324            .build()
3325            .unwrap();
3326        let expected = ArrowField::new(
3327            "R",
3328            DataType::Struct(Fields::from(vec![ArrowField::new(
3329                "new",
3330                DataType::Int32,
3331                false,
3332            )])),
3333            false,
3334        );
3335        assert_eq!(resolved.field(), expected);
3336    }
3337
3338    #[test]
3339    fn test_record_field_alias_ambiguous_in_strict_mode_errors() {
3340        let writer_json = r#"{
3341          "type":"record",
3342          "name":"R",
3343          "fields":[
3344            {"name":"a","type":"int","aliases":["old"]},
3345            {"name":"b","type":"int","aliases":["old"]}
3346          ]
3347        }"#;
3348        let reader_json = r#"{
3349          "type":"record",
3350          "name":"R",
3351          "fields":[{"name":"target","type":"int","aliases":["old"]}]
3352        }"#;
3353        let writer: Schema = serde_json::from_str(writer_json).unwrap();
3354        let reader: Schema = serde_json::from_str(reader_json).unwrap();
3355        let err = AvroFieldBuilder::new(&writer)
3356            .with_reader_schema(&reader)
3357            .with_utf8view(false)
3358            .with_strict_mode(true)
3359            .build()
3360            .unwrap_err()
3361            .to_string();
3362        assert!(
3363            err.contains("Ambiguous alias 'old'"),
3364            "expected ambiguous-alias error, got: {err}"
3365        );
3366    }
3367
3368    #[test]
3369    fn test_pragmatic_writer_field_alias_mapping_non_strict() {
3370        let writer_json = r#"{
3371          "type":"record",
3372          "name":"R",
3373          "fields":[{"name":"before","type":"int","aliases":["now"]}]
3374        }"#;
3375        let reader_json = r#"{
3376          "type":"record",
3377          "name":"R",
3378          "fields":[{"name":"now","type":"int"}]
3379        }"#;
3380        let writer: Schema = serde_json::from_str(writer_json).unwrap();
3381        let reader: Schema = serde_json::from_str(reader_json).unwrap();
3382        let resolved = AvroFieldBuilder::new(&writer)
3383            .with_reader_schema(&reader)
3384            .with_utf8view(false)
3385            .with_strict_mode(false)
3386            .build()
3387            .unwrap();
3388        let expected = ArrowField::new(
3389            "R",
3390            DataType::Struct(Fields::from(vec![ArrowField::new(
3391                "now",
3392                DataType::Int32,
3393                false,
3394            )])),
3395            false,
3396        );
3397        assert_eq!(resolved.field(), expected);
3398    }
3399
3400    #[test]
3401    fn test_missing_reader_field_null_first_no_default_is_ok() {
3402        let writer_json = r#"{
3403          "type":"record",
3404          "name":"R",
3405          "fields":[{"name":"a","type":"int"}]
3406        }"#;
3407        let reader_json = r#"{
3408          "type":"record",
3409          "name":"R",
3410          "fields":[
3411            {"name":"a","type":"int"},
3412            {"name":"b","type":["null","int"]}
3413          ]
3414        }"#;
3415        let writer: Schema = serde_json::from_str(writer_json).unwrap();
3416        let reader: Schema = serde_json::from_str(reader_json).unwrap();
3417        let resolved = AvroFieldBuilder::new(&writer)
3418            .with_reader_schema(&reader)
3419            .with_utf8view(false)
3420            .with_strict_mode(false)
3421            .build()
3422            .unwrap();
3423        let expected = ArrowField::new(
3424            "R",
3425            DataType::Struct(Fields::from(vec![
3426                ArrowField::new("a", DataType::Int32, false),
3427                ArrowField::new("b", DataType::Int32, true).with_metadata(HashMap::from([(
3428                    AVRO_FIELD_DEFAULT_METADATA_KEY.to_string(),
3429                    "null".to_string(),
3430                )])),
3431            ])),
3432            false,
3433        );
3434        assert_eq!(resolved.field(), expected);
3435    }
3436
3437    #[test]
3438    fn test_missing_reader_field_null_second_without_default_errors() {
3439        let writer_json = r#"{
3440          "type":"record",
3441          "name":"R",
3442          "fields":[{"name":"a","type":"int"}]
3443        }"#;
3444        let reader_json = r#"{
3445          "type":"record",
3446          "name":"R",
3447          "fields":[
3448            {"name":"a","type":"int"},
3449            {"name":"b","type":["int","null"]}
3450          ]
3451        }"#;
3452        let writer: Schema = serde_json::from_str(writer_json).unwrap();
3453        let reader: Schema = serde_json::from_str(reader_json).unwrap();
3454        let err = AvroFieldBuilder::new(&writer)
3455            .with_reader_schema(&reader)
3456            .with_utf8view(false)
3457            .with_strict_mode(false)
3458            .build()
3459            .unwrap_err()
3460            .to_string();
3461        assert!(
3462            err.contains("must have a default value"),
3463            "expected missing-default error, got: {err}"
3464        );
3465    }
3466
3467    #[test]
3468    fn test_from_arrow_with_options_respects_schema_metadata_when_not_stripping() {
3469        let field = ArrowField::new("x", DataType::Int32, true);
3470        let injected_json =
3471            r#"{"type":"record","name":"Injected","fields":[{"name":"ignored","type":"int"}]}"#
3472                .to_string();
3473        let mut md = HashMap::new();
3474        md.insert(SCHEMA_METADATA_KEY.to_string(), injected_json.clone());
3475        md.insert("custom".to_string(), "123".to_string());
3476        let arrow_schema = ArrowSchema::new_with_metadata(vec![field], md);
3477        let opts = AvroSchemaOptions {
3478            null_order: Some(Nullability::NullSecond),
3479            strip_metadata: false,
3480        };
3481        let out = AvroSchema::from_arrow_with_options(&arrow_schema, Some(opts)).unwrap();
3482        assert_eq!(
3483            out.json_string, injected_json,
3484            "When strip_metadata=false and avro.schema is present, return the embedded JSON verbatim"
3485        );
3486        let v: Value = serde_json::from_str(&out.json_string).unwrap();
3487        assert_eq!(v.get("type").and_then(|t| t.as_str()), Some("record"));
3488        assert_eq!(v.get("name").and_then(|n| n.as_str()), Some("Injected"));
3489    }
3490
3491    #[test]
3492    fn test_from_arrow_with_options_ignores_schema_metadata_when_stripping_and_keeps_passthrough() {
3493        let field = ArrowField::new("x", DataType::Int32, true);
3494        let injected_json =
3495            r#"{"type":"record","name":"Injected","fields":[{"name":"ignored","type":"int"}]}"#
3496                .to_string();
3497        let mut md = HashMap::new();
3498        md.insert(SCHEMA_METADATA_KEY.to_string(), injected_json);
3499        md.insert("custom_meta".to_string(), "7".to_string());
3500        let arrow_schema = ArrowSchema::new_with_metadata(vec![field], md);
3501        let opts = AvroSchemaOptions {
3502            null_order: Some(Nullability::NullFirst),
3503            strip_metadata: true,
3504        };
3505        let out = AvroSchema::from_arrow_with_options(&arrow_schema, Some(opts)).unwrap();
3506        assert_json_contains(&out.json_string, "\"type\":\"record\"");
3507        assert_json_contains(&out.json_string, "\"name\":\"topLevelRecord\"");
3508        assert_json_contains(&out.json_string, "\"custom_meta\":7");
3509    }
3510
3511    #[test]
3512    fn test_from_arrow_with_options_null_first_for_nullable_primitive() {
3513        let field = ArrowField::new("s", DataType::Utf8, true);
3514        let arrow_schema = single_field_schema(field);
3515        let opts = AvroSchemaOptions {
3516            null_order: Some(Nullability::NullFirst),
3517            strip_metadata: true,
3518        };
3519        let out = AvroSchema::from_arrow_with_options(&arrow_schema, Some(opts)).unwrap();
3520        let v: Value = serde_json::from_str(&out.json_string).unwrap();
3521        let arr = v["fields"][0]["type"]
3522            .as_array()
3523            .expect("nullable primitive should be Avro union array");
3524        assert_eq!(arr[0], Value::String("null".into()));
3525        assert_eq!(arr[1], Value::String("string".into()));
3526    }
3527
3528    #[test]
3529    fn test_from_arrow_with_options_null_second_for_nullable_primitive() {
3530        let field = ArrowField::new("s", DataType::Utf8, true);
3531        let arrow_schema = single_field_schema(field);
3532        let opts = AvroSchemaOptions {
3533            null_order: Some(Nullability::NullSecond),
3534            strip_metadata: true,
3535        };
3536        let out = AvroSchema::from_arrow_with_options(&arrow_schema, Some(opts)).unwrap();
3537        let v: Value = serde_json::from_str(&out.json_string).unwrap();
3538        let arr = v["fields"][0]["type"]
3539            .as_array()
3540            .expect("nullable primitive should be Avro union array");
3541        assert_eq!(arr[0], Value::String("string".into()));
3542        assert_eq!(arr[1], Value::String("null".into()));
3543    }
3544
3545    #[test]
3546    fn test_from_arrow_with_options_union_extras_respected_by_strip_metadata() {
3547        let uf: UnionFields = vec![
3548            (2i8, Arc::new(ArrowField::new("a", DataType::Int32, false))),
3549            (7i8, Arc::new(ArrowField::new("b", DataType::Utf8, false))),
3550        ]
3551        .into_iter()
3552        .collect();
3553        let union_dt = DataType::Union(uf, UnionMode::Dense);
3554        let arrow_schema = single_field_schema(ArrowField::new("u", union_dt, true));
3555        let with_extras = AvroSchema::from_arrow_with_options(
3556            &arrow_schema,
3557            Some(AvroSchemaOptions {
3558                null_order: Some(Nullability::NullFirst),
3559                strip_metadata: false,
3560            }),
3561        )
3562        .unwrap();
3563        let v_with: Value = serde_json::from_str(&with_extras.json_string).unwrap();
3564        let union_arr = v_with["fields"][0]["type"].as_array().expect("union array");
3565        let first_obj = union_arr
3566            .iter()
3567            .find(|b| b.is_object())
3568            .expect("expected an object branch with extras");
3569        let obj = first_obj.as_object().unwrap();
3570        assert_eq!(obj.get("type").and_then(|t| t.as_str()), Some("int"));
3571        assert_eq!(
3572            obj.get("arrowUnionMode").and_then(|m| m.as_str()),
3573            Some("dense")
3574        );
3575        let type_ids: Vec<i64> = obj["arrowUnionTypeIds"]
3576            .as_array()
3577            .expect("arrowUnionTypeIds array")
3578            .iter()
3579            .map(|n| n.as_i64().expect("i64"))
3580            .collect();
3581        assert_eq!(type_ids, vec![2, 7]);
3582        let stripped = AvroSchema::from_arrow_with_options(
3583            &arrow_schema,
3584            Some(AvroSchemaOptions {
3585                null_order: Some(Nullability::NullFirst),
3586                strip_metadata: true,
3587            }),
3588        )
3589        .unwrap();
3590        let v_stripped: Value = serde_json::from_str(&stripped.json_string).unwrap();
3591        let union_arr2 = v_stripped["fields"][0]["type"]
3592            .as_array()
3593            .expect("union array");
3594        assert!(
3595            !union_arr2.iter().any(|b| b
3596                .as_object()
3597                .is_some_and(|m| m.contains_key("arrowUnionMode"))),
3598            "extras must be removed when strip_metadata=true"
3599        );
3600        assert_eq!(union_arr2[0], Value::String("null".into()));
3601        assert_eq!(union_arr2[1], Value::String("int".into()));
3602        assert_eq!(union_arr2[2], Value::String("string".into()));
3603    }
3604
3605    #[test]
3606    fn test_project_empty_projection() {
3607        let schema_json = r#"{
3608            "type": "record",
3609            "name": "Test",
3610            "fields": [
3611                {"name": "a", "type": "int"},
3612                {"name": "b", "type": "string"}
3613            ]
3614        }"#;
3615        let schema = AvroSchema::new(schema_json.to_string());
3616        let projected = schema.project(&[]).unwrap();
3617        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3618        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3619        assert!(
3620            fields.is_empty(),
3621            "Empty projection should yield empty fields"
3622        );
3623    }
3624
3625    #[test]
3626    fn test_project_single_field() {
3627        let schema_json = r#"{
3628            "type": "record",
3629            "name": "Test",
3630            "fields": [
3631                {"name": "a", "type": "int"},
3632                {"name": "b", "type": "string"},
3633                {"name": "c", "type": "long"}
3634            ]
3635        }"#;
3636        let schema = AvroSchema::new(schema_json.to_string());
3637        let projected = schema.project(&[1]).unwrap();
3638        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3639        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3640        assert_eq!(fields.len(), 1);
3641        assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("b"));
3642    }
3643
3644    #[test]
3645    fn test_project_multiple_fields() {
3646        let schema_json = r#"{
3647            "type": "record",
3648            "name": "Test",
3649            "fields": [
3650                {"name": "a", "type": "int"},
3651                {"name": "b", "type": "string"},
3652                {"name": "c", "type": "long"},
3653                {"name": "d", "type": "boolean"}
3654            ]
3655        }"#;
3656        let schema = AvroSchema::new(schema_json.to_string());
3657        let projected = schema.project(&[0, 2, 3]).unwrap();
3658        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3659        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3660        assert_eq!(fields.len(), 3);
3661        assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("a"));
3662        assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("c"));
3663        assert_eq!(fields[2].get("name").and_then(|n| n.as_str()), Some("d"));
3664    }
3665
3666    #[test]
3667    fn test_project_all_fields() {
3668        let schema_json = r#"{
3669            "type": "record",
3670            "name": "Test",
3671            "fields": [
3672                {"name": "a", "type": "int"},
3673                {"name": "b", "type": "string"}
3674            ]
3675        }"#;
3676        let schema = AvroSchema::new(schema_json.to_string());
3677        let projected = schema.project(&[0, 1]).unwrap();
3678        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3679        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3680        assert_eq!(fields.len(), 2);
3681        assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("a"));
3682        assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("b"));
3683    }
3684
3685    #[test]
3686    fn test_project_reorder_fields() {
3687        let schema_json = r#"{
3688            "type": "record",
3689            "name": "Test",
3690            "fields": [
3691                {"name": "a", "type": "int"},
3692                {"name": "b", "type": "string"},
3693                {"name": "c", "type": "long"}
3694            ]
3695        }"#;
3696        let schema = AvroSchema::new(schema_json.to_string());
3697        // Project in reverse order
3698        let projected = schema.project(&[2, 0, 1]).unwrap();
3699        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3700        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3701        assert_eq!(fields.len(), 3);
3702        assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("c"));
3703        assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("a"));
3704        assert_eq!(fields[2].get("name").and_then(|n| n.as_str()), Some("b"));
3705    }
3706
3707    #[test]
3708    fn test_project_preserves_record_metadata() {
3709        let schema_json = r#"{
3710            "type": "record",
3711            "name": "MyRecord",
3712            "namespace": "com.example",
3713            "doc": "A test record",
3714            "aliases": ["OldRecord"],
3715            "fields": [
3716                {"name": "a", "type": "int"},
3717                {"name": "b", "type": "string"}
3718            ]
3719        }"#;
3720        let schema = AvroSchema::new(schema_json.to_string());
3721        let projected = schema.project(&[0]).unwrap();
3722        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3723        assert_eq!(v.get("name").and_then(|n| n.as_str()), Some("MyRecord"));
3724        assert_eq!(
3725            v.get("namespace").and_then(|n| n.as_str()),
3726            Some("com.example")
3727        );
3728        assert_eq!(v.get("doc").and_then(|n| n.as_str()), Some("A test record"));
3729        assert!(v.get("aliases").is_some());
3730    }
3731
3732    #[test]
3733    fn test_project_preserves_field_metadata() {
3734        let schema_json = r#"{
3735            "type": "record",
3736            "name": "Test",
3737            "fields": [
3738                {"name": "a", "type": "int", "doc": "Field A", "default": 0},
3739                {"name": "b", "type": "string"}
3740            ]
3741        }"#;
3742        let schema = AvroSchema::new(schema_json.to_string());
3743        let projected = schema.project(&[0]).unwrap();
3744        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3745        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3746        assert_eq!(
3747            fields[0].get("doc").and_then(|d| d.as_str()),
3748            Some("Field A")
3749        );
3750        assert_eq!(fields[0].get("default").and_then(|d| d.as_i64()), Some(0));
3751    }
3752
3753    #[test]
3754    fn test_project_with_nested_record() {
3755        let schema_json = r#"{
3756            "type": "record",
3757            "name": "Outer",
3758            "fields": [
3759                {"name": "id", "type": "int"},
3760                {"name": "inner", "type": {
3761                    "type": "record",
3762                    "name": "Inner",
3763                    "fields": [
3764                        {"name": "x", "type": "int"},
3765                        {"name": "y", "type": "string"}
3766                    ]
3767                }},
3768                {"name": "value", "type": "double"}
3769            ]
3770        }"#;
3771        let schema = AvroSchema::new(schema_json.to_string());
3772        let projected = schema.project(&[1]).unwrap();
3773        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3774        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3775        assert_eq!(fields.len(), 1);
3776        assert_eq!(
3777            fields[0].get("name").and_then(|n| n.as_str()),
3778            Some("inner")
3779        );
3780        // Verify nested record structure is preserved
3781        let inner_type = fields[0].get("type").unwrap();
3782        assert_eq!(
3783            inner_type.get("type").and_then(|t| t.as_str()),
3784            Some("record")
3785        );
3786        assert_eq!(
3787            inner_type.get("name").and_then(|n| n.as_str()),
3788            Some("Inner")
3789        );
3790    }
3791
3792    #[test]
3793    fn test_project_with_complex_field_types() {
3794        let schema_json = r#"{
3795            "type": "record",
3796            "name": "Test",
3797            "fields": [
3798                {"name": "arr", "type": {"type": "array", "items": "int"}},
3799                {"name": "map", "type": {"type": "map", "values": "string"}},
3800                {"name": "union", "type": ["null", "int"]}
3801            ]
3802        }"#;
3803        let schema = AvroSchema::new(schema_json.to_string());
3804        let projected = schema.project(&[0, 2]).unwrap();
3805        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3806        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3807        assert_eq!(fields.len(), 2);
3808        // Verify array type is preserved
3809        let arr_type = fields[0].get("type").unwrap();
3810        assert_eq!(arr_type.get("type").and_then(|t| t.as_str()), Some("array"));
3811        // Verify union type is preserved
3812        let union_type = fields[1].get("type").unwrap();
3813        assert!(union_type.is_array());
3814    }
3815
3816    #[test]
3817    fn test_project_error_invalid_json() {
3818        let schema = AvroSchema::new("not valid json".to_string());
3819        let err = schema.project(&[0]).unwrap_err();
3820        let msg = err.to_string();
3821        assert!(
3822            msg.contains("Invalid Avro schema JSON"),
3823            "Expected parse error, got: {msg}"
3824        );
3825    }
3826
3827    #[test]
3828    fn test_project_error_not_object() {
3829        // Primitive type schema (not a JSON object)
3830        let schema = AvroSchema::new(r#""string""#.to_string());
3831        let err = schema.project(&[0]).unwrap_err();
3832        let msg = err.to_string();
3833        assert!(
3834            msg.contains("must be a JSON object"),
3835            "Expected object error, got: {msg}"
3836        );
3837    }
3838
3839    #[test]
3840    fn test_project_error_array_schema() {
3841        // Array (list) is a valid JSON but not a record
3842        let schema = AvroSchema::new(r#"["null", "int"]"#.to_string());
3843        let err = schema.project(&[0]).unwrap_err();
3844        let msg = err.to_string();
3845        assert!(
3846            msg.contains("must be a JSON object"),
3847            "Expected object error for array schema, got: {msg}"
3848        );
3849    }
3850
3851    #[test]
3852    fn test_project_error_type_not_record() {
3853        let schema_json = r#"{
3854            "type": "enum",
3855            "name": "Color",
3856            "symbols": ["RED", "GREEN", "BLUE"]
3857        }"#;
3858        let schema = AvroSchema::new(schema_json.to_string());
3859        let err = schema.project(&[0]).unwrap_err();
3860        let msg = err.to_string();
3861        assert!(
3862            msg.contains("must be an Avro record") && msg.contains("'enum'"),
3863            "Expected type mismatch error, got: {msg}"
3864        );
3865    }
3866
3867    #[test]
3868    fn test_project_error_type_array() {
3869        let schema_json = r#"{
3870            "type": "array",
3871            "items": "int"
3872        }"#;
3873        let schema = AvroSchema::new(schema_json.to_string());
3874        let err = schema.project(&[0]).unwrap_err();
3875        let msg = err.to_string();
3876        assert!(
3877            msg.contains("must be an Avro record") && msg.contains("'array'"),
3878            "Expected type mismatch error for array type, got: {msg}"
3879        );
3880    }
3881
3882    #[test]
3883    fn test_project_error_type_fixed() {
3884        let schema_json = r#"{
3885            "type": "fixed",
3886            "name": "MD5",
3887            "size": 16
3888        }"#;
3889        let schema = AvroSchema::new(schema_json.to_string());
3890        let err = schema.project(&[0]).unwrap_err();
3891        let msg = err.to_string();
3892        assert!(
3893            msg.contains("must be an Avro record") && msg.contains("'fixed'"),
3894            "Expected type mismatch error for fixed type, got: {msg}"
3895        );
3896    }
3897
3898    #[test]
3899    fn test_project_error_type_map() {
3900        let schema_json = r#"{
3901            "type": "map",
3902            "values": "string"
3903        }"#;
3904        let schema = AvroSchema::new(schema_json.to_string());
3905        let err = schema.project(&[0]).unwrap_err();
3906        let msg = err.to_string();
3907        assert!(
3908            msg.contains("must be an Avro record") && msg.contains("'map'"),
3909            "Expected type mismatch error for map type, got: {msg}"
3910        );
3911    }
3912
3913    #[test]
3914    fn test_project_error_missing_type_field() {
3915        let schema_json = r#"{
3916            "name": "Test",
3917            "fields": [{"name": "a", "type": "int"}]
3918        }"#;
3919        let schema = AvroSchema::new(schema_json.to_string());
3920        let err = schema.project(&[0]).unwrap_err();
3921        let msg = err.to_string();
3922        assert!(
3923            msg.contains("missing required 'type' field"),
3924            "Expected missing type error, got: {msg}"
3925        );
3926    }
3927
3928    #[test]
3929    fn test_project_error_missing_fields() {
3930        let schema_json = r#"{
3931            "type": "record",
3932            "name": "Test"
3933        }"#;
3934        let schema = AvroSchema::new(schema_json.to_string());
3935        let err = schema.project(&[0]).unwrap_err();
3936        let msg = err.to_string();
3937        assert!(
3938            msg.contains("missing required 'fields'"),
3939            "Expected missing fields error, got: {msg}"
3940        );
3941    }
3942
3943    #[test]
3944    fn test_project_error_fields_not_array() {
3945        let schema_json = r#"{
3946            "type": "record",
3947            "name": "Test",
3948            "fields": "not an array"
3949        }"#;
3950        let schema = AvroSchema::new(schema_json.to_string());
3951        let err = schema.project(&[0]).unwrap_err();
3952        let msg = err.to_string();
3953        assert!(
3954            msg.contains("'fields' must be an array"),
3955            "Expected fields array error, got: {msg}"
3956        );
3957    }
3958
3959    #[test]
3960    fn test_project_error_index_out_of_bounds() {
3961        let schema_json = r#"{
3962            "type": "record",
3963            "name": "Test",
3964            "fields": [
3965                {"name": "a", "type": "int"},
3966                {"name": "b", "type": "string"}
3967            ]
3968        }"#;
3969        let schema = AvroSchema::new(schema_json.to_string());
3970        let err = schema.project(&[5]).unwrap_err();
3971        let msg = err.to_string();
3972        assert!(
3973            msg.contains("out of bounds") && msg.contains("5") && msg.contains("2"),
3974            "Expected out of bounds error, got: {msg}"
3975        );
3976    }
3977
3978    #[test]
3979    fn test_project_error_index_out_of_bounds_edge() {
3980        let schema_json = r#"{
3981            "type": "record",
3982            "name": "Test",
3983            "fields": [
3984                {"name": "a", "type": "int"}
3985            ]
3986        }"#;
3987        let schema = AvroSchema::new(schema_json.to_string());
3988        // Index 1 is just out of bounds for a 1-element array
3989        let err = schema.project(&[1]).unwrap_err();
3990        let msg = err.to_string();
3991        assert!(
3992            msg.contains("out of bounds") && msg.contains("1"),
3993            "Expected out of bounds error for edge case, got: {msg}"
3994        );
3995    }
3996
3997    #[test]
3998    fn test_project_error_duplicate_index() {
3999        let schema_json = r#"{
4000            "type": "record",
4001            "name": "Test",
4002            "fields": [
4003                {"name": "a", "type": "int"},
4004                {"name": "b", "type": "string"},
4005                {"name": "c", "type": "long"}
4006            ]
4007        }"#;
4008        let schema = AvroSchema::new(schema_json.to_string());
4009        let err = schema.project(&[0, 1, 0]).unwrap_err();
4010        let msg = err.to_string();
4011        assert!(
4012            msg.contains("Duplicate projection index") && msg.contains("0"),
4013            "Expected duplicate index error, got: {msg}"
4014        );
4015    }
4016
4017    #[test]
4018    fn test_project_error_duplicate_index_consecutive() {
4019        let schema_json = r#"{
4020            "type": "record",
4021            "name": "Test",
4022            "fields": [
4023                {"name": "a", "type": "int"},
4024                {"name": "b", "type": "string"}
4025            ]
4026        }"#;
4027        let schema = AvroSchema::new(schema_json.to_string());
4028        let err = schema.project(&[1, 1]).unwrap_err();
4029        let msg = err.to_string();
4030        assert!(
4031            msg.contains("Duplicate projection index") && msg.contains("1"),
4032            "Expected duplicate index error for consecutive duplicates, got: {msg}"
4033        );
4034    }
4035
4036    #[test]
4037    fn test_project_with_empty_fields() {
4038        let schema_json = r#"{
4039            "type": "record",
4040            "name": "EmptyRecord",
4041            "fields": []
4042        }"#;
4043        let schema = AvroSchema::new(schema_json.to_string());
4044        // Projecting empty from empty should succeed
4045        let projected = schema.project(&[]).unwrap();
4046        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
4047        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
4048        assert!(fields.is_empty());
4049    }
4050
4051    #[test]
4052    fn test_project_empty_fields_index_out_of_bounds() {
4053        let schema_json = r#"{
4054            "type": "record",
4055            "name": "EmptyRecord",
4056            "fields": []
4057        }"#;
4058        let schema = AvroSchema::new(schema_json.to_string());
4059        let err = schema.project(&[0]).unwrap_err();
4060        let msg = err.to_string();
4061        assert!(
4062            msg.contains("out of bounds") && msg.contains("0 fields"),
4063            "Expected out of bounds error for empty record, got: {msg}"
4064        );
4065    }
4066
4067    #[test]
4068    fn test_project_result_is_valid_avro_schema() {
4069        let schema_json = r#"{
4070            "type": "record",
4071            "name": "Test",
4072            "namespace": "com.example",
4073            "fields": [
4074                {"name": "id", "type": "long"},
4075                {"name": "name", "type": "string"},
4076                {"name": "active", "type": "boolean"}
4077            ]
4078        }"#;
4079        let schema = AvroSchema::new(schema_json.to_string());
4080        let projected = schema.project(&[0, 2]).unwrap();
4081        // Verify the projected schema can be parsed as a valid Avro schema
4082        let parsed = projected.schema();
4083        assert!(parsed.is_ok(), "Projected schema should be valid Avro");
4084        match parsed.unwrap() {
4085            Schema::Complex(ComplexType::Record(r)) => {
4086                assert_eq!(r.name, "Test");
4087                assert_eq!(r.namespace, Some("com.example"));
4088                assert_eq!(r.fields.len(), 2);
4089                assert_eq!(r.fields[0].name, "id");
4090                assert_eq!(r.fields[1].name, "active");
4091            }
4092            _ => panic!("Expected Record schema"),
4093        }
4094    }
4095
4096    #[test]
4097    fn test_project_non_contiguous_indices() {
4098        let schema_json = r#"{
4099            "type": "record",
4100            "name": "Test",
4101            "fields": [
4102                {"name": "f0", "type": "int"},
4103                {"name": "f1", "type": "int"},
4104                {"name": "f2", "type": "int"},
4105                {"name": "f3", "type": "int"},
4106                {"name": "f4", "type": "int"}
4107            ]
4108        }"#;
4109        let schema = AvroSchema::new(schema_json.to_string());
4110        // Select every other field
4111        let projected = schema.project(&[0, 2, 4]).unwrap();
4112        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
4113        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
4114        assert_eq!(fields.len(), 3);
4115        assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("f0"));
4116        assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("f2"));
4117        assert_eq!(fields[2].get("name").and_then(|n| n.as_str()), Some("f4"));
4118    }
4119
4120    #[test]
4121    fn test_project_single_field_from_many() {
4122        let schema_json = r#"{
4123            "type": "record",
4124            "name": "BigRecord",
4125            "fields": [
4126                {"name": "f0", "type": "int"},
4127                {"name": "f1", "type": "int"},
4128                {"name": "f2", "type": "int"},
4129                {"name": "f3", "type": "int"},
4130                {"name": "f4", "type": "int"},
4131                {"name": "f5", "type": "int"},
4132                {"name": "f6", "type": "int"},
4133                {"name": "f7", "type": "int"},
4134                {"name": "f8", "type": "int"},
4135                {"name": "f9", "type": "int"}
4136            ]
4137        }"#;
4138        let schema = AvroSchema::new(schema_json.to_string());
4139        // Select only the last field
4140        let projected = schema.project(&[9]).unwrap();
4141        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
4142        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
4143        assert_eq!(fields.len(), 1);
4144        assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("f9"));
4145    }
4146}