Skip to main content

polars_arrow/io/avro/read/
schema.rs

1use avro_schema::schema::{Enum, Fixed, Record, Schema as AvroSchema};
2use polars_error::{PolarsResult, polars_bail};
3use polars_utils::pl_str::PlSmallStr;
4
5use crate::datatypes::*;
6
7fn external_props(schema: &AvroSchema) -> Metadata {
8    let mut props = Metadata::new();
9    match schema {
10        AvroSchema::Record(Record { doc: Some(doc), .. })
11        | AvroSchema::Enum(Enum { doc: Some(doc), .. }) => {
12            props.insert(
13                PlSmallStr::from_static("avro::doc"),
14                PlSmallStr::from_str(doc.as_str()),
15            );
16        },
17        _ => {},
18    }
19    props
20}
21
22/// Infers an [`ArrowSchema`] from the root [`Record`].
23/// This
24pub fn infer_schema(record: &Record) -> PolarsResult<ArrowSchema> {
25    record
26        .fields
27        .iter()
28        .map(|field| {
29            let field = schema_to_field(
30                &field.schema,
31                Some(&field.name),
32                external_props(&field.schema),
33            )?;
34
35            Ok((field.name.clone(), field))
36        })
37        .collect::<PolarsResult<ArrowSchema>>()
38}
39
40fn schema_to_field(
41    schema: &AvroSchema,
42    name: Option<&str>,
43    props: Metadata,
44) -> PolarsResult<Field> {
45    let mut nullable = false;
46    let dtype = match schema {
47        AvroSchema::Null => ArrowDataType::Null,
48        AvroSchema::Boolean => ArrowDataType::Boolean,
49        AvroSchema::Int(logical) => match logical {
50            Some(logical) => match logical {
51                avro_schema::schema::IntLogical::Date => ArrowDataType::Date32,
52                avro_schema::schema::IntLogical::Time => {
53                    ArrowDataType::Time32(TimeUnit::Millisecond)
54                },
55            },
56            None => ArrowDataType::Int32,
57        },
58        AvroSchema::Long(logical) => match logical {
59            Some(logical) => match logical {
60                avro_schema::schema::LongLogical::Time => {
61                    ArrowDataType::Time64(TimeUnit::Microsecond)
62                },
63                avro_schema::schema::LongLogical::TimestampMillis => ArrowDataType::Timestamp(
64                    TimeUnit::Millisecond,
65                    Some(PlSmallStr::from_static("00:00")),
66                ),
67                avro_schema::schema::LongLogical::TimestampMicros => ArrowDataType::Timestamp(
68                    TimeUnit::Microsecond,
69                    Some(PlSmallStr::from_static("00:00")),
70                ),
71                avro_schema::schema::LongLogical::LocalTimestampMillis => {
72                    ArrowDataType::Timestamp(TimeUnit::Millisecond, None)
73                },
74                avro_schema::schema::LongLogical::LocalTimestampMicros => {
75                    ArrowDataType::Timestamp(TimeUnit::Microsecond, None)
76                },
77            },
78            None => ArrowDataType::Int64,
79        },
80        AvroSchema::Float => ArrowDataType::Float32,
81        AvroSchema::Double => ArrowDataType::Float64,
82        AvroSchema::Bytes(logical) => match logical {
83            Some(logical) => match logical {
84                avro_schema::schema::BytesLogical::Decimal(precision, scale) => {
85                    ArrowDataType::Decimal(*precision, *scale)
86                },
87            },
88            None => ArrowDataType::Binary,
89        },
90        AvroSchema::String(_) => ArrowDataType::Utf8,
91        AvroSchema::Array(item_schema) => ArrowDataType::List(Box::new(schema_to_field(
92            item_schema,
93            Some("item"), // default name for list items
94            Metadata::default(),
95        )?)),
96        AvroSchema::Map(_) => todo!("Avro maps are mapped to MapArrays"),
97        AvroSchema::Union(schemas) => {
98            // If there are only two variants and one of them is null, set the other type as the field data type
99            let has_nullable = schemas.iter().any(|x| x == &AvroSchema::Null);
100            if has_nullable && schemas.len() == 2 {
101                nullable = true;
102                if let Some(schema) = schemas
103                    .iter()
104                    .find(|&schema| !matches!(schema, AvroSchema::Null))
105                {
106                    schema_to_field(schema, None, Metadata::default())?.dtype
107                } else {
108                    polars_bail!(nyi = "Can't read avro union {schema:?}");
109                }
110            } else {
111                let fields = schemas
112                    .iter()
113                    .map(|s| schema_to_field(s, None, Metadata::default()))
114                    .collect::<PolarsResult<Vec<Field>>>()?;
115                ArrowDataType::Union(Box::new(UnionType {
116                    fields,
117                    ids: None,
118                    mode: UnionMode::Dense,
119                }))
120            }
121        },
122        AvroSchema::Record(Record { fields, .. }) => {
123            let fields = fields
124                .iter()
125                .map(|field| {
126                    let mut props = Metadata::new();
127                    if let Some(doc) = &field.doc {
128                        props.insert(
129                            PlSmallStr::from_static("avro::doc"),
130                            PlSmallStr::from_str(doc),
131                        );
132                    }
133                    schema_to_field(&field.schema, Some(&field.name), props)
134                })
135                .collect::<PolarsResult<_>>()?;
136            ArrowDataType::Struct(fields)
137        },
138        AvroSchema::Enum { .. } => {
139            return Ok(Field::new(
140                PlSmallStr::from_str(name.unwrap_or_default()),
141                ArrowDataType::Dictionary(IntegerType::Int32, Box::new(ArrowDataType::Utf8), false),
142                false,
143            ));
144        },
145        AvroSchema::Fixed(Fixed { size, logical, .. }) => match logical {
146            Some(logical) => match logical {
147                avro_schema::schema::FixedLogical::Decimal(precision, scale) => {
148                    ArrowDataType::Decimal(*precision, *scale)
149                },
150                avro_schema::schema::FixedLogical::Duration => {
151                    ArrowDataType::Interval(IntervalUnit::MonthDayNano)
152                },
153            },
154            None => ArrowDataType::FixedSizeBinary(*size),
155        },
156    };
157
158    let name = name.unwrap_or_default();
159
160    Ok(Field::new(PlSmallStr::from_str(name), dtype, nullable).with_metadata(props))
161}