polars_arrow/io/avro/read/
schema.rs1use avro_schema::schema::{Enum, Fixed, Record, Schema as AvroSchema};
2use polars_error::{PolarsResult, polars_bail};
3use polars_utils::pl_str::PlSmallStr;
4
5use crate::datatypes::*;
6
7fn external_props(schema: &AvroSchema) -> Metadata {
8 let mut props = Metadata::new();
9 match schema {
10 AvroSchema::Record(Record { doc: Some(doc), .. })
11 | AvroSchema::Enum(Enum { doc: Some(doc), .. }) => {
12 props.insert(
13 PlSmallStr::from_static("avro::doc"),
14 PlSmallStr::from_str(doc.as_str()),
15 );
16 },
17 _ => {},
18 }
19 props
20}
21
22pub fn infer_schema(record: &Record) -> PolarsResult<ArrowSchema> {
25 record
26 .fields
27 .iter()
28 .map(|field| {
29 let field = schema_to_field(
30 &field.schema,
31 Some(&field.name),
32 external_props(&field.schema),
33 )?;
34
35 Ok((field.name.clone(), field))
36 })
37 .collect::<PolarsResult<ArrowSchema>>()
38}
39
40fn schema_to_field(
41 schema: &AvroSchema,
42 name: Option<&str>,
43 props: Metadata,
44) -> PolarsResult<Field> {
45 let mut nullable = false;
46 let dtype = match schema {
47 AvroSchema::Null => ArrowDataType::Null,
48 AvroSchema::Boolean => ArrowDataType::Boolean,
49 AvroSchema::Int(logical) => match logical {
50 Some(logical) => match logical {
51 avro_schema::schema::IntLogical::Date => ArrowDataType::Date32,
52 avro_schema::schema::IntLogical::Time => {
53 ArrowDataType::Time32(TimeUnit::Millisecond)
54 },
55 },
56 None => ArrowDataType::Int32,
57 },
58 AvroSchema::Long(logical) => match logical {
59 Some(logical) => match logical {
60 avro_schema::schema::LongLogical::Time => {
61 ArrowDataType::Time64(TimeUnit::Microsecond)
62 },
63 avro_schema::schema::LongLogical::TimestampMillis => ArrowDataType::Timestamp(
64 TimeUnit::Millisecond,
65 Some(PlSmallStr::from_static("00:00")),
66 ),
67 avro_schema::schema::LongLogical::TimestampMicros => ArrowDataType::Timestamp(
68 TimeUnit::Microsecond,
69 Some(PlSmallStr::from_static("00:00")),
70 ),
71 avro_schema::schema::LongLogical::LocalTimestampMillis => {
72 ArrowDataType::Timestamp(TimeUnit::Millisecond, None)
73 },
74 avro_schema::schema::LongLogical::LocalTimestampMicros => {
75 ArrowDataType::Timestamp(TimeUnit::Microsecond, None)
76 },
77 },
78 None => ArrowDataType::Int64,
79 },
80 AvroSchema::Float => ArrowDataType::Float32,
81 AvroSchema::Double => ArrowDataType::Float64,
82 AvroSchema::Bytes(logical) => match logical {
83 Some(logical) => match logical {
84 avro_schema::schema::BytesLogical::Decimal(precision, scale) => {
85 ArrowDataType::Decimal(*precision, *scale)
86 },
87 },
88 None => ArrowDataType::Binary,
89 },
90 AvroSchema::String(_) => ArrowDataType::Utf8,
91 AvroSchema::Array(item_schema) => ArrowDataType::List(Box::new(schema_to_field(
92 item_schema,
93 Some("item"), Metadata::default(),
95 )?)),
96 AvroSchema::Map(_) => todo!("Avro maps are mapped to MapArrays"),
97 AvroSchema::Union(schemas) => {
98 let has_nullable = schemas.iter().any(|x| x == &AvroSchema::Null);
100 if has_nullable && schemas.len() == 2 {
101 nullable = true;
102 if let Some(schema) = schemas
103 .iter()
104 .find(|&schema| !matches!(schema, AvroSchema::Null))
105 {
106 schema_to_field(schema, None, Metadata::default())?.dtype
107 } else {
108 polars_bail!(nyi = "Can't read avro union {schema:?}");
109 }
110 } else {
111 let fields = schemas
112 .iter()
113 .map(|s| schema_to_field(s, None, Metadata::default()))
114 .collect::<PolarsResult<Vec<Field>>>()?;
115 ArrowDataType::Union(Box::new(UnionType {
116 fields,
117 ids: None,
118 mode: UnionMode::Dense,
119 }))
120 }
121 },
122 AvroSchema::Record(Record { fields, .. }) => {
123 let fields = fields
124 .iter()
125 .map(|field| {
126 let mut props = Metadata::new();
127 if let Some(doc) = &field.doc {
128 props.insert(
129 PlSmallStr::from_static("avro::doc"),
130 PlSmallStr::from_str(doc),
131 );
132 }
133 schema_to_field(&field.schema, Some(&field.name), props)
134 })
135 .collect::<PolarsResult<_>>()?;
136 ArrowDataType::Struct(fields)
137 },
138 AvroSchema::Enum { .. } => {
139 return Ok(Field::new(
140 PlSmallStr::from_str(name.unwrap_or_default()),
141 ArrowDataType::Dictionary(IntegerType::Int32, Box::new(ArrowDataType::Utf8), false),
142 false,
143 ));
144 },
145 AvroSchema::Fixed(Fixed { size, logical, .. }) => match logical {
146 Some(logical) => match logical {
147 avro_schema::schema::FixedLogical::Decimal(precision, scale) => {
148 ArrowDataType::Decimal(*precision, *scale)
149 },
150 avro_schema::schema::FixedLogical::Duration => {
151 ArrowDataType::Interval(IntervalUnit::MonthDayNano)
152 },
153 },
154 None => ArrowDataType::FixedSizeBinary(*size),
155 },
156 };
157
158 let name = name.unwrap_or_default();
159
160 Ok(Field::new(PlSmallStr::from_str(name), dtype, nullable).with_metadata(props))
161}