datafusion_proto_common/from_proto/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::convert::{TryFrom, TryInto};
20use std::sync::Arc;
21
22use crate::common::proto_error;
23use crate::protobuf_common as protobuf;
24use arrow::array::{ArrayRef, AsArray};
25use arrow::buffer::Buffer;
26use arrow::csv::WriterBuilder;
27use arrow::datatypes::{
28    i256, DataType, Field, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit,
29    Schema, TimeUnit, UnionFields, UnionMode,
30};
31use arrow::ipc::{reader::read_record_batch, root_as_message};
32
33use datafusion_common::{
34    arrow_datafusion_err,
35    config::{
36        CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
37        TableParquetOptions,
38    },
39    file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
40    not_impl_err,
41    parsers::CompressionTypeVariant,
42    plan_datafusion_err,
43    stats::Precision,
44    Column, ColumnStatistics, Constraint, Constraints, DFSchema, DFSchemaRef,
45    DataFusionError, JoinSide, ScalarValue, Statistics, TableReference,
46};
47
48#[derive(Debug)]
49pub enum Error {
50    General(String),
51
52    DataFusionError(DataFusionError),
53
54    MissingRequiredField(String),
55
56    AtLeastOneValue(String),
57
58    UnknownEnumVariant { name: String, value: i32 },
59}
60
61impl std::fmt::Display for Error {
62    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
63        match self {
64            Self::General(desc) => write!(f, "General error: {desc}"),
65
66            Self::DataFusionError(desc) => {
67                write!(f, "DataFusion error: {desc:?}")
68            }
69
70            Self::MissingRequiredField(name) => {
71                write!(f, "Missing required field {name}")
72            }
73            Self::AtLeastOneValue(name) => {
74                write!(f, "Must have at least one {name}, found 0")
75            }
76            Self::UnknownEnumVariant { name, value } => {
77                write!(f, "Unknown i32 value for {name} enum: {value}")
78            }
79        }
80    }
81}
82
83impl std::error::Error for Error {}
84
85impl From<DataFusionError> for Error {
86    fn from(e: DataFusionError) -> Self {
87        Error::DataFusionError(e)
88    }
89}
90
91impl Error {
92    pub fn required(field: impl Into<String>) -> Error {
93        Error::MissingRequiredField(field.into())
94    }
95
96    pub fn unknown(name: impl Into<String>, value: i32) -> Error {
97        Error::UnknownEnumVariant {
98            name: name.into(),
99            value,
100        }
101    }
102}
103
104impl From<Error> for DataFusionError {
105    fn from(e: Error) -> Self {
106        plan_datafusion_err!("{}", e)
107    }
108}
109
110/// An extension trait that adds the methods `optional` and `required` to any
111/// Option containing a type implementing `TryInto<U, Error = Error>`
112pub trait FromOptionalField<T> {
113    /// Converts an optional protobuf field to an option of a different type
114    ///
115    /// Returns None if the option is None, otherwise calls [`TryInto::try_into`]
116    /// on the contained data, returning any error encountered
117    fn optional(self) -> datafusion_common::Result<Option<T>, Error>;
118
119    /// Converts an optional protobuf field to a different type, returning an error if None
120    ///
121    /// Returns `Error::MissingRequiredField` if None, otherwise calls [`TryInto::try_into`]
122    /// on the contained data, returning any error encountered
123    fn required(self, field: impl Into<String>) -> datafusion_common::Result<T, Error>;
124}
125
126impl<T, U> FromOptionalField<U> for Option<T>
127where
128    T: TryInto<U, Error = Error>,
129{
130    fn optional(self) -> datafusion_common::Result<Option<U>, Error> {
131        self.map(|t| t.try_into()).transpose()
132    }
133
134    fn required(self, field: impl Into<String>) -> datafusion_common::Result<U, Error> {
135        match self {
136            None => Err(Error::required(field)),
137            Some(t) => t.try_into(),
138        }
139    }
140}
141
142impl From<protobuf::Column> for Column {
143    fn from(c: protobuf::Column) -> Self {
144        let protobuf::Column { relation, name } = c;
145
146        Self::new(relation.map(|r| r.relation), name)
147    }
148}
149
150impl From<&protobuf::Column> for Column {
151    fn from(c: &protobuf::Column) -> Self {
152        c.clone().into()
153    }
154}
155
156impl TryFrom<&protobuf::DfSchema> for DFSchema {
157    type Error = Error;
158
159    fn try_from(
160        df_schema: &protobuf::DfSchema,
161    ) -> datafusion_common::Result<Self, Self::Error> {
162        let df_fields = df_schema.columns.clone();
163        let qualifiers_and_fields: Vec<(Option<TableReference>, Arc<Field>)> = df_fields
164            .iter()
165            .map(|df_field| {
166                let field: Field = df_field.field.as_ref().required("field")?;
167                Ok((
168                    df_field
169                        .qualifier
170                        .as_ref()
171                        .map(|q| q.relation.clone().into()),
172                    Arc::new(field),
173                ))
174            })
175            .collect::<datafusion_common::Result<Vec<_>, Error>>()?;
176
177        Ok(DFSchema::new_with_metadata(
178            qualifiers_and_fields,
179            df_schema.metadata.clone(),
180        )?)
181    }
182}
183
184impl TryFrom<protobuf::DfSchema> for DFSchemaRef {
185    type Error = Error;
186
187    fn try_from(
188        df_schema: protobuf::DfSchema,
189    ) -> datafusion_common::Result<Self, Self::Error> {
190        let dfschema: DFSchema = (&df_schema).try_into()?;
191        Ok(Arc::new(dfschema))
192    }
193}
194
195impl TryFrom<&protobuf::ArrowType> for DataType {
196    type Error = Error;
197
198    fn try_from(
199        arrow_type: &protobuf::ArrowType,
200    ) -> datafusion_common::Result<Self, Self::Error> {
201        arrow_type
202            .arrow_type_enum
203            .as_ref()
204            .required("arrow_type_enum")
205    }
206}
207
208impl TryFrom<&protobuf::arrow_type::ArrowTypeEnum> for DataType {
209    type Error = Error;
210    fn try_from(
211        arrow_type_enum: &protobuf::arrow_type::ArrowTypeEnum,
212    ) -> datafusion_common::Result<Self, Self::Error> {
213        use protobuf::arrow_type;
214        Ok(match arrow_type_enum {
215            arrow_type::ArrowTypeEnum::None(_) => DataType::Null,
216            arrow_type::ArrowTypeEnum::Bool(_) => DataType::Boolean,
217            arrow_type::ArrowTypeEnum::Uint8(_) => DataType::UInt8,
218            arrow_type::ArrowTypeEnum::Int8(_) => DataType::Int8,
219            arrow_type::ArrowTypeEnum::Uint16(_) => DataType::UInt16,
220            arrow_type::ArrowTypeEnum::Int16(_) => DataType::Int16,
221            arrow_type::ArrowTypeEnum::Uint32(_) => DataType::UInt32,
222            arrow_type::ArrowTypeEnum::Int32(_) => DataType::Int32,
223            arrow_type::ArrowTypeEnum::Uint64(_) => DataType::UInt64,
224            arrow_type::ArrowTypeEnum::Int64(_) => DataType::Int64,
225            arrow_type::ArrowTypeEnum::Float16(_) => DataType::Float16,
226            arrow_type::ArrowTypeEnum::Float32(_) => DataType::Float32,
227            arrow_type::ArrowTypeEnum::Float64(_) => DataType::Float64,
228            arrow_type::ArrowTypeEnum::Utf8(_) => DataType::Utf8,
229            arrow_type::ArrowTypeEnum::Utf8View(_) => DataType::Utf8View,
230            arrow_type::ArrowTypeEnum::LargeUtf8(_) => DataType::LargeUtf8,
231            arrow_type::ArrowTypeEnum::Binary(_) => DataType::Binary,
232            arrow_type::ArrowTypeEnum::BinaryView(_) => DataType::BinaryView,
233            arrow_type::ArrowTypeEnum::FixedSizeBinary(size) => {
234                DataType::FixedSizeBinary(*size)
235            }
236            arrow_type::ArrowTypeEnum::LargeBinary(_) => DataType::LargeBinary,
237            arrow_type::ArrowTypeEnum::Date32(_) => DataType::Date32,
238            arrow_type::ArrowTypeEnum::Date64(_) => DataType::Date64,
239            arrow_type::ArrowTypeEnum::Duration(time_unit) => {
240                DataType::Duration(parse_i32_to_time_unit(time_unit)?)
241            }
242            arrow_type::ArrowTypeEnum::Timestamp(protobuf::Timestamp {
243                time_unit,
244                timezone,
245            }) => DataType::Timestamp(
246                parse_i32_to_time_unit(time_unit)?,
247                match timezone.len() {
248                    0 => None,
249                    _ => Some(timezone.as_str().into()),
250                },
251            ),
252            arrow_type::ArrowTypeEnum::Time32(time_unit) => {
253                DataType::Time32(parse_i32_to_time_unit(time_unit)?)
254            }
255            arrow_type::ArrowTypeEnum::Time64(time_unit) => {
256                DataType::Time64(parse_i32_to_time_unit(time_unit)?)
257            }
258            arrow_type::ArrowTypeEnum::Interval(interval_unit) => {
259                DataType::Interval(parse_i32_to_interval_unit(interval_unit)?)
260            }
261            arrow_type::ArrowTypeEnum::Decimal32(protobuf::Decimal32Type {
262                precision,
263                scale,
264            }) => DataType::Decimal32(*precision as u8, *scale as i8),
265            arrow_type::ArrowTypeEnum::Decimal64(protobuf::Decimal64Type {
266                precision,
267                scale,
268            }) => DataType::Decimal64(*precision as u8, *scale as i8),
269            arrow_type::ArrowTypeEnum::Decimal128(protobuf::Decimal128Type {
270                precision,
271                scale,
272            }) => DataType::Decimal128(*precision as u8, *scale as i8),
273            arrow_type::ArrowTypeEnum::Decimal256(protobuf::Decimal256Type {
274                precision,
275                scale,
276            }) => DataType::Decimal256(*precision as u8, *scale as i8),
277            arrow_type::ArrowTypeEnum::List(list) => {
278                let list_type =
279                    list.as_ref().field_type.as_deref().required("field_type")?;
280                DataType::List(Arc::new(list_type))
281            }
282            arrow_type::ArrowTypeEnum::LargeList(list) => {
283                let list_type =
284                    list.as_ref().field_type.as_deref().required("field_type")?;
285                DataType::LargeList(Arc::new(list_type))
286            }
287            arrow_type::ArrowTypeEnum::FixedSizeList(list) => {
288                let list_type =
289                    list.as_ref().field_type.as_deref().required("field_type")?;
290                let list_size = list.list_size;
291                DataType::FixedSizeList(Arc::new(list_type), list_size)
292            }
293            arrow_type::ArrowTypeEnum::Struct(strct) => DataType::Struct(
294                parse_proto_fields_to_fields(&strct.sub_field_types)?.into(),
295            ),
296            arrow_type::ArrowTypeEnum::Union(union) => {
297                let union_mode = protobuf::UnionMode::try_from(union.union_mode)
298                    .map_err(|_| Error::unknown("UnionMode", union.union_mode))?;
299                let union_mode = match union_mode {
300                    protobuf::UnionMode::Dense => UnionMode::Dense,
301                    protobuf::UnionMode::Sparse => UnionMode::Sparse,
302                };
303                let union_fields = parse_proto_fields_to_fields(&union.union_types)?;
304
305                // Default to index based type ids if not provided
306                let type_ids: Vec<_> = match union.type_ids.is_empty() {
307                    true => (0..union_fields.len() as i8).collect(),
308                    false => union.type_ids.iter().map(|i| *i as i8).collect(),
309                };
310
311                DataType::Union(UnionFields::new(type_ids, union_fields), union_mode)
312            }
313            arrow_type::ArrowTypeEnum::Dictionary(dict) => {
314                let key_datatype = dict.as_ref().key.as_deref().required("key")?;
315                let value_datatype = dict.as_ref().value.as_deref().required("value")?;
316                DataType::Dictionary(Box::new(key_datatype), Box::new(value_datatype))
317            }
318            arrow_type::ArrowTypeEnum::Map(map) => {
319                let field: Field =
320                    map.as_ref().field_type.as_deref().required("field_type")?;
321                let keys_sorted = map.keys_sorted;
322                DataType::Map(Arc::new(field), keys_sorted)
323            }
324        })
325    }
326}
327
328impl TryFrom<&protobuf::Field> for Field {
329    type Error = Error;
330    fn try_from(field: &protobuf::Field) -> Result<Self, Self::Error> {
331        let datatype = field.arrow_type.as_deref().required("arrow_type")?;
332        let field = Self::new(field.name.as_str(), datatype, field.nullable)
333            .with_metadata(field.metadata.clone());
334        Ok(field)
335    }
336}
337
338impl TryFrom<&protobuf::Schema> for Schema {
339    type Error = Error;
340
341    fn try_from(
342        schema: &protobuf::Schema,
343    ) -> datafusion_common::Result<Self, Self::Error> {
344        let fields = schema
345            .columns
346            .iter()
347            .map(Field::try_from)
348            .collect::<datafusion_common::Result<Vec<_>, _>>()?;
349        Ok(Self::new_with_metadata(fields, schema.metadata.clone()))
350    }
351}
352
353impl TryFrom<&protobuf::ScalarValue> for ScalarValue {
354    type Error = Error;
355
356    fn try_from(
357        scalar: &protobuf::ScalarValue,
358    ) -> datafusion_common::Result<Self, Self::Error> {
359        use protobuf::scalar_value::Value;
360
361        let value = scalar
362            .value
363            .as_ref()
364            .ok_or_else(|| Error::required("value"))?;
365
366        Ok(match value {
367            Value::BoolValue(v) => Self::Boolean(Some(*v)),
368            Value::Utf8Value(v) => Self::Utf8(Some(v.to_owned())),
369            Value::Utf8ViewValue(v) => Self::Utf8View(Some(v.to_owned())),
370            Value::LargeUtf8Value(v) => Self::LargeUtf8(Some(v.to_owned())),
371            Value::Int8Value(v) => Self::Int8(Some(*v as i8)),
372            Value::Int16Value(v) => Self::Int16(Some(*v as i16)),
373            Value::Int32Value(v) => Self::Int32(Some(*v)),
374            Value::Int64Value(v) => Self::Int64(Some(*v)),
375            Value::Uint8Value(v) => Self::UInt8(Some(*v as u8)),
376            Value::Uint16Value(v) => Self::UInt16(Some(*v as u16)),
377            Value::Uint32Value(v) => Self::UInt32(Some(*v)),
378            Value::Uint64Value(v) => Self::UInt64(Some(*v)),
379            Value::Float32Value(v) => Self::Float32(Some(*v)),
380            Value::Float64Value(v) => Self::Float64(Some(*v)),
381            Value::Date32Value(v) => Self::Date32(Some(*v)),
382            // ScalarValue::List is serialized using arrow IPC format
383            Value::ListValue(v)
384            | Value::FixedSizeListValue(v)
385            | Value::LargeListValue(v)
386            | Value::StructValue(v)
387            | Value::MapValue(v) => {
388                let protobuf::ScalarNestedValue {
389                    ipc_message,
390                    arrow_data,
391                    dictionaries,
392                    schema,
393                } = &v;
394
395                let schema: Schema = if let Some(schema_ref) = schema {
396                    schema_ref.try_into()?
397                } else {
398                    return Err(Error::General(
399                        "Invalid schema while deserializing ScalarValue::List"
400                            .to_string(),
401                    ));
402                };
403
404                let message = root_as_message(ipc_message.as_slice()).map_err(|e| {
405                    Error::General(format!(
406                        "Error IPC message while deserializing ScalarValue::List: {e}"
407                    ))
408                })?;
409                let buffer = Buffer::from(arrow_data.as_slice());
410
411                let ipc_batch = message.header_as_record_batch().ok_or_else(|| {
412                    Error::General(
413                        "Unexpected message type deserializing ScalarValue::List"
414                            .to_string(),
415                    )
416                })?;
417
418                let dict_by_id: HashMap<i64,ArrayRef> = dictionaries.iter().map(|protobuf::scalar_nested_value::Dictionary { ipc_message, arrow_data }| {
419                    let message = root_as_message(ipc_message.as_slice()).map_err(|e| {
420                        Error::General(format!(
421                            "Error IPC message while deserializing ScalarValue::List dictionary message: {e}"
422                        ))
423                    })?;
424                    let buffer = Buffer::from(arrow_data.as_slice());
425
426                    let dict_batch = message.header_as_dictionary_batch().ok_or_else(|| {
427                        Error::General(
428                            "Unexpected message type deserializing ScalarValue::List dictionary message"
429                                .to_string(),
430                        )
431                    })?;
432
433                    let id = dict_batch.id();
434
435                    let record_batch = read_record_batch(
436                        &buffer,
437                        dict_batch.data().unwrap(),
438                        Arc::new(schema.clone()),
439                        &Default::default(),
440                        None,
441                        &message.version(),
442                    )?;
443
444                    let values: ArrayRef = Arc::clone(record_batch.column(0));
445
446                    Ok((id, values))
447                }).collect::<datafusion_common::Result<HashMap<_, _>>>()?;
448
449                let record_batch = read_record_batch(
450                    &buffer,
451                    ipc_batch,
452                    Arc::new(schema),
453                    &dict_by_id,
454                    None,
455                    &message.version(),
456                )
457                .map_err(|e| arrow_datafusion_err!(e))
458                .map_err(|e| e.context("Decoding ScalarValue::List Value"))?;
459                let arr = record_batch.column(0);
460                match value {
461                    Value::ListValue(_) => {
462                        Self::List(arr.as_list::<i32>().to_owned().into())
463                    }
464                    Value::LargeListValue(_) => {
465                        Self::LargeList(arr.as_list::<i64>().to_owned().into())
466                    }
467                    Value::FixedSizeListValue(_) => {
468                        Self::FixedSizeList(arr.as_fixed_size_list().to_owned().into())
469                    }
470                    Value::StructValue(_) => {
471                        Self::Struct(arr.as_struct().to_owned().into())
472                    }
473                    Value::MapValue(_) => Self::Map(arr.as_map().to_owned().into()),
474                    _ => unreachable!(),
475                }
476            }
477            Value::NullValue(v) => {
478                let null_type: DataType = v.try_into()?;
479                null_type.try_into().map_err(Error::DataFusionError)?
480            }
481            Value::Decimal32Value(_val) => {
482                return not_impl_err!("Decimal32 protobuf deserialization")
483                    .map_err(Error::DataFusionError)
484            }
485            Value::Decimal64Value(_val) => {
486                return not_impl_err!("Decimal64 protobuf deserialization")
487                    .map_err(Error::DataFusionError)
488            }
489            Value::Decimal128Value(val) => {
490                let array = vec_to_array(val.value.clone());
491                Self::Decimal128(
492                    Some(i128::from_be_bytes(array)),
493                    val.p as u8,
494                    val.s as i8,
495                )
496            }
497            Value::Decimal256Value(val) => {
498                let array = vec_to_array(val.value.clone());
499                Self::Decimal256(
500                    Some(i256::from_be_bytes(array)),
501                    val.p as u8,
502                    val.s as i8,
503                )
504            }
505            Value::Date64Value(v) => Self::Date64(Some(*v)),
506            Value::Time32Value(v) => {
507                let time_value =
508                    v.value.as_ref().ok_or_else(|| Error::required("value"))?;
509                match time_value {
510                    protobuf::scalar_time32_value::Value::Time32SecondValue(t) => {
511                        Self::Time32Second(Some(*t))
512                    }
513                    protobuf::scalar_time32_value::Value::Time32MillisecondValue(t) => {
514                        Self::Time32Millisecond(Some(*t))
515                    }
516                }
517            }
518            Value::Time64Value(v) => {
519                let time_value =
520                    v.value.as_ref().ok_or_else(|| Error::required("value"))?;
521                match time_value {
522                    protobuf::scalar_time64_value::Value::Time64MicrosecondValue(t) => {
523                        Self::Time64Microsecond(Some(*t))
524                    }
525                    protobuf::scalar_time64_value::Value::Time64NanosecondValue(t) => {
526                        Self::Time64Nanosecond(Some(*t))
527                    }
528                }
529            }
530            Value::IntervalYearmonthValue(v) => Self::IntervalYearMonth(Some(*v)),
531            Value::DurationSecondValue(v) => Self::DurationSecond(Some(*v)),
532            Value::DurationMillisecondValue(v) => Self::DurationMillisecond(Some(*v)),
533            Value::DurationMicrosecondValue(v) => Self::DurationMicrosecond(Some(*v)),
534            Value::DurationNanosecondValue(v) => Self::DurationNanosecond(Some(*v)),
535            Value::TimestampValue(v) => {
536                let timezone = if v.timezone.is_empty() {
537                    None
538                } else {
539                    Some(v.timezone.as_str().into())
540                };
541
542                let ts_value =
543                    v.value.as_ref().ok_or_else(|| Error::required("value"))?;
544
545                match ts_value {
546                    protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(t) => {
547                        Self::TimestampMicrosecond(Some(*t), timezone)
548                    }
549                    protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(t) => {
550                        Self::TimestampNanosecond(Some(*t), timezone)
551                    }
552                    protobuf::scalar_timestamp_value::Value::TimeSecondValue(t) => {
553                        Self::TimestampSecond(Some(*t), timezone)
554                    }
555                    protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(t) => {
556                        Self::TimestampMillisecond(Some(*t), timezone)
557                    }
558                }
559            }
560            Value::DictionaryValue(v) => {
561                let index_type: DataType = v
562                    .index_type
563                    .as_ref()
564                    .ok_or_else(|| Error::required("index_type"))?
565                    .try_into()?;
566
567                let value: Self = v
568                    .value
569                    .as_ref()
570                    .ok_or_else(|| Error::required("value"))?
571                    .as_ref()
572                    .try_into()?;
573
574                Self::Dictionary(Box::new(index_type), Box::new(value))
575            }
576            Value::BinaryValue(v) => Self::Binary(Some(v.clone())),
577            Value::BinaryViewValue(v) => Self::BinaryView(Some(v.clone())),
578            Value::LargeBinaryValue(v) => Self::LargeBinary(Some(v.clone())),
579            Value::IntervalDaytimeValue(v) => Self::IntervalDayTime(Some(
580                IntervalDayTimeType::make_value(v.days, v.milliseconds),
581            )),
582            Value::IntervalMonthDayNano(v) => Self::IntervalMonthDayNano(Some(
583                IntervalMonthDayNanoType::make_value(v.months, v.days, v.nanos),
584            )),
585            Value::UnionValue(val) => {
586                let mode = match val.mode {
587                    0 => UnionMode::Sparse,
588                    1 => UnionMode::Dense,
589                    id => Err(Error::unknown("UnionMode", id))?,
590                };
591                let ids = val
592                    .fields
593                    .iter()
594                    .map(|f| f.field_id as i8)
595                    .collect::<Vec<_>>();
596                let fields = val
597                    .fields
598                    .iter()
599                    .map(|f| f.field.clone())
600                    .collect::<Option<Vec<_>>>();
601                let fields = fields.ok_or_else(|| Error::required("UnionField"))?;
602                let fields = parse_proto_fields_to_fields(&fields)?;
603                let fields = UnionFields::new(ids, fields);
604                let v_id = val.value_id as i8;
605                let val = match &val.value {
606                    None => None,
607                    Some(val) => {
608                        let val: ScalarValue = val
609                            .as_ref()
610                            .try_into()
611                            .map_err(|_| Error::General("Invalid Scalar".to_string()))?;
612                        Some((v_id, Box::new(val)))
613                    }
614                };
615                Self::Union(val, fields, mode)
616            }
617            Value::FixedSizeBinaryValue(v) => {
618                Self::FixedSizeBinary(v.length, Some(v.clone().values))
619            }
620        })
621    }
622}
623
624impl From<protobuf::TimeUnit> for TimeUnit {
625    fn from(time_unit: protobuf::TimeUnit) -> Self {
626        match time_unit {
627            protobuf::TimeUnit::Second => TimeUnit::Second,
628            protobuf::TimeUnit::Millisecond => TimeUnit::Millisecond,
629            protobuf::TimeUnit::Microsecond => TimeUnit::Microsecond,
630            protobuf::TimeUnit::Nanosecond => TimeUnit::Nanosecond,
631        }
632    }
633}
634
635impl From<protobuf::IntervalUnit> for IntervalUnit {
636    fn from(interval_unit: protobuf::IntervalUnit) -> Self {
637        match interval_unit {
638            protobuf::IntervalUnit::YearMonth => IntervalUnit::YearMonth,
639            protobuf::IntervalUnit::DayTime => IntervalUnit::DayTime,
640            protobuf::IntervalUnit::MonthDayNano => IntervalUnit::MonthDayNano,
641        }
642    }
643}
644
645impl From<protobuf::Constraints> for Constraints {
646    fn from(constraints: protobuf::Constraints) -> Self {
647        Constraints::new_unverified(
648            constraints
649                .constraints
650                .into_iter()
651                .map(|item| item.into())
652                .collect(),
653        )
654    }
655}
656
657impl From<protobuf::Constraint> for Constraint {
658    fn from(value: protobuf::Constraint) -> Self {
659        match value.constraint_mode.unwrap() {
660            protobuf::constraint::ConstraintMode::PrimaryKey(elem) => {
661                Constraint::PrimaryKey(
662                    elem.indices.into_iter().map(|item| item as usize).collect(),
663                )
664            }
665            protobuf::constraint::ConstraintMode::Unique(elem) => Constraint::Unique(
666                elem.indices.into_iter().map(|item| item as usize).collect(),
667            ),
668        }
669    }
670}
671
672impl From<&protobuf::ColumnStats> for ColumnStatistics {
673    fn from(cs: &protobuf::ColumnStats) -> ColumnStatistics {
674        ColumnStatistics {
675            null_count: if let Some(nc) = &cs.null_count {
676                nc.clone().into()
677            } else {
678                Precision::Absent
679            },
680            max_value: if let Some(max) = &cs.max_value {
681                max.clone().into()
682            } else {
683                Precision::Absent
684            },
685            min_value: if let Some(min) = &cs.min_value {
686                min.clone().into()
687            } else {
688                Precision::Absent
689            },
690            sum_value: if let Some(sum) = &cs.sum_value {
691                sum.clone().into()
692            } else {
693                Precision::Absent
694            },
695            distinct_count: if let Some(dc) = &cs.distinct_count {
696                dc.clone().into()
697            } else {
698                Precision::Absent
699            },
700        }
701    }
702}
703
704impl From<protobuf::Precision> for Precision<usize> {
705    fn from(s: protobuf::Precision) -> Self {
706        let Ok(precision_type) = s.precision_info.try_into() else {
707            return Precision::Absent;
708        };
709        match precision_type {
710            protobuf::PrecisionInfo::Exact => {
711                if let Some(val) = s.val {
712                    if let Ok(ScalarValue::UInt64(Some(val))) =
713                        ScalarValue::try_from(&val)
714                    {
715                        Precision::Exact(val as usize)
716                    } else {
717                        Precision::Absent
718                    }
719                } else {
720                    Precision::Absent
721                }
722            }
723            protobuf::PrecisionInfo::Inexact => {
724                if let Some(val) = s.val {
725                    if let Ok(ScalarValue::UInt64(Some(val))) =
726                        ScalarValue::try_from(&val)
727                    {
728                        Precision::Inexact(val as usize)
729                    } else {
730                        Precision::Absent
731                    }
732                } else {
733                    Precision::Absent
734                }
735            }
736            protobuf::PrecisionInfo::Absent => Precision::Absent,
737        }
738    }
739}
740
741impl From<protobuf::Precision> for Precision<ScalarValue> {
742    fn from(s: protobuf::Precision) -> Self {
743        let Ok(precision_type) = s.precision_info.try_into() else {
744            return Precision::Absent;
745        };
746        match precision_type {
747            protobuf::PrecisionInfo::Exact => {
748                if let Some(val) = s.val {
749                    if let Ok(val) = ScalarValue::try_from(&val) {
750                        Precision::Exact(val)
751                    } else {
752                        Precision::Absent
753                    }
754                } else {
755                    Precision::Absent
756                }
757            }
758            protobuf::PrecisionInfo::Inexact => {
759                if let Some(val) = s.val {
760                    if let Ok(val) = ScalarValue::try_from(&val) {
761                        Precision::Inexact(val)
762                    } else {
763                        Precision::Absent
764                    }
765                } else {
766                    Precision::Absent
767                }
768            }
769            protobuf::PrecisionInfo::Absent => Precision::Absent,
770        }
771    }
772}
773
774impl From<protobuf::JoinSide> for JoinSide {
775    fn from(t: protobuf::JoinSide) -> Self {
776        match t {
777            protobuf::JoinSide::LeftSide => JoinSide::Left,
778            protobuf::JoinSide::RightSide => JoinSide::Right,
779            protobuf::JoinSide::None => JoinSide::None,
780        }
781    }
782}
783
784impl From<&protobuf::Constraint> for Constraint {
785    fn from(value: &protobuf::Constraint) -> Self {
786        match &value.constraint_mode {
787            Some(protobuf::constraint::ConstraintMode::PrimaryKey(elem)) => {
788                Constraint::PrimaryKey(
789                    elem.indices.iter().map(|&item| item as usize).collect(),
790                )
791            }
792            Some(protobuf::constraint::ConstraintMode::Unique(elem)) => {
793                Constraint::Unique(
794                    elem.indices.iter().map(|&item| item as usize).collect(),
795                )
796            }
797            None => panic!("constraint_mode not set"),
798        }
799    }
800}
801
802impl TryFrom<&protobuf::Constraints> for Constraints {
803    type Error = DataFusionError;
804
805    fn try_from(
806        constraints: &protobuf::Constraints,
807    ) -> datafusion_common::Result<Self, Self::Error> {
808        Ok(Constraints::new_unverified(
809            constraints
810                .constraints
811                .iter()
812                .map(|item| item.into())
813                .collect(),
814        ))
815    }
816}
817
818impl TryFrom<&protobuf::Statistics> for Statistics {
819    type Error = DataFusionError;
820
821    fn try_from(
822        s: &protobuf::Statistics,
823    ) -> datafusion_common::Result<Self, Self::Error> {
824        // Keep it sync with Statistics::to_proto
825        Ok(Statistics {
826            num_rows: if let Some(nr) = &s.num_rows {
827                nr.clone().into()
828            } else {
829                Precision::Absent
830            },
831            total_byte_size: if let Some(tbs) = &s.total_byte_size {
832                tbs.clone().into()
833            } else {
834                Precision::Absent
835            },
836            // No column statistic (None) is encoded with empty array
837            column_statistics: s.column_stats.iter().map(|s| s.into()).collect(),
838        })
839    }
840}
841
842impl From<protobuf::CompressionTypeVariant> for CompressionTypeVariant {
843    fn from(value: protobuf::CompressionTypeVariant) -> Self {
844        match value {
845            protobuf::CompressionTypeVariant::Gzip => Self::GZIP,
846            protobuf::CompressionTypeVariant::Bzip2 => Self::BZIP2,
847            protobuf::CompressionTypeVariant::Xz => Self::XZ,
848            protobuf::CompressionTypeVariant::Zstd => Self::ZSTD,
849            protobuf::CompressionTypeVariant::Uncompressed => Self::UNCOMPRESSED,
850        }
851    }
852}
853
854impl From<CompressionTypeVariant> for protobuf::CompressionTypeVariant {
855    fn from(value: CompressionTypeVariant) -> Self {
856        match value {
857            CompressionTypeVariant::GZIP => Self::Gzip,
858            CompressionTypeVariant::BZIP2 => Self::Bzip2,
859            CompressionTypeVariant::XZ => Self::Xz,
860            CompressionTypeVariant::ZSTD => Self::Zstd,
861            CompressionTypeVariant::UNCOMPRESSED => Self::Uncompressed,
862        }
863    }
864}
865
866impl TryFrom<&protobuf::CsvWriterOptions> for CsvWriterOptions {
867    type Error = DataFusionError;
868
869    fn try_from(
870        opts: &protobuf::CsvWriterOptions,
871    ) -> datafusion_common::Result<Self, Self::Error> {
872        let write_options = csv_writer_options_from_proto(opts)?;
873        let compression: CompressionTypeVariant = opts.compression().into();
874        Ok(CsvWriterOptions::new(write_options, compression))
875    }
876}
877
878impl TryFrom<&protobuf::JsonWriterOptions> for JsonWriterOptions {
879    type Error = DataFusionError;
880
881    fn try_from(
882        opts: &protobuf::JsonWriterOptions,
883    ) -> datafusion_common::Result<Self, Self::Error> {
884        let compression: CompressionTypeVariant = opts.compression().into();
885        Ok(JsonWriterOptions::new(compression))
886    }
887}
888
889impl TryFrom<&protobuf::CsvOptions> for CsvOptions {
890    type Error = DataFusionError;
891
892    fn try_from(
893        proto_opts: &protobuf::CsvOptions,
894    ) -> datafusion_common::Result<Self, Self::Error> {
895        Ok(CsvOptions {
896            has_header: proto_opts.has_header.first().map(|h| *h != 0),
897            delimiter: proto_opts.delimiter[0],
898            quote: proto_opts.quote[0],
899            terminator: proto_opts.terminator.first().copied(),
900            escape: proto_opts.escape.first().copied(),
901            double_quote: proto_opts.has_header.first().map(|h| *h != 0),
902            newlines_in_values: proto_opts.newlines_in_values.first().map(|h| *h != 0),
903            compression: proto_opts.compression().into(),
904            schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize),
905            date_format: (!proto_opts.date_format.is_empty())
906                .then(|| proto_opts.date_format.clone()),
907            datetime_format: (!proto_opts.datetime_format.is_empty())
908                .then(|| proto_opts.datetime_format.clone()),
909            timestamp_format: (!proto_opts.timestamp_format.is_empty())
910                .then(|| proto_opts.timestamp_format.clone()),
911            timestamp_tz_format: (!proto_opts.timestamp_tz_format.is_empty())
912                .then(|| proto_opts.timestamp_tz_format.clone()),
913            time_format: (!proto_opts.time_format.is_empty())
914                .then(|| proto_opts.time_format.clone()),
915            null_value: (!proto_opts.null_value.is_empty())
916                .then(|| proto_opts.null_value.clone()),
917            null_regex: (!proto_opts.null_regex.is_empty())
918                .then(|| proto_opts.null_regex.clone()),
919            comment: proto_opts.comment.first().copied(),
920        })
921    }
922}
923
924impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
925    type Error = DataFusionError;
926
927    fn try_from(
928        value: &protobuf::ParquetOptions,
929    ) -> datafusion_common::Result<Self, Self::Error> {
930        #[allow(deprecated)] // max_statistics_size
931        Ok(ParquetOptions {
932            enable_page_index: value.enable_page_index,
933            pruning: value.pruning,
934            skip_metadata: value.skip_metadata,
935            metadata_size_hint: value
936                .metadata_size_hint_opt
937                .map(|opt| match opt {
938                    protobuf::parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v) => Some(v as usize),
939                })
940                .unwrap_or(None),
941            pushdown_filters: value.pushdown_filters,
942            reorder_filters: value.reorder_filters,
943            data_pagesize_limit: value.data_pagesize_limit as usize,
944            write_batch_size: value.write_batch_size as usize,
945            writer_version: value.writer_version.clone(),
946            compression: value.compression_opt.clone().map(|opt| match opt {
947                protobuf::parquet_options::CompressionOpt::Compression(v) => Some(v),
948            }).unwrap_or(None),
949            dictionary_enabled: value.dictionary_enabled_opt.as_ref().map(|protobuf::parquet_options::DictionaryEnabledOpt::DictionaryEnabled(v)| *v),
950            // Continuing from where we left off in the TryFrom implementation
951            dictionary_page_size_limit: value.dictionary_page_size_limit as usize,
952            statistics_enabled: value
953                .statistics_enabled_opt.clone()
954                .map(|opt| match opt {
955                    protobuf::parquet_options::StatisticsEnabledOpt::StatisticsEnabled(v) => Some(v),
956                })
957                .unwrap_or(None),
958            max_row_group_size: value.max_row_group_size as usize,
959            created_by: value.created_by.clone(),
960            column_index_truncate_length: value
961                .column_index_truncate_length_opt.as_ref()
962                .map(|opt| match opt {
963                    protobuf::parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v) => Some(*v as usize),
964                })
965                .unwrap_or(None),
966            statistics_truncate_length: value
967                .statistics_truncate_length_opt.as_ref()
968                .map(|opt| match opt {
969                    protobuf::parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v) => Some(*v as usize),
970                })
971                .unwrap_or(None),
972            data_page_row_count_limit: value.data_page_row_count_limit as usize,
973            encoding: value
974                .encoding_opt.clone()
975                .map(|opt| match opt {
976                    protobuf::parquet_options::EncodingOpt::Encoding(v) => Some(v),
977                })
978                .unwrap_or(None),
979            bloom_filter_on_read: value.bloom_filter_on_read,
980            bloom_filter_on_write: value.bloom_filter_on_write,
981            bloom_filter_fpp: value.clone()
982                .bloom_filter_fpp_opt
983                .map(|opt| match opt {
984                    protobuf::parquet_options::BloomFilterFppOpt::BloomFilterFpp(v) => Some(v),
985                })
986                .unwrap_or(None),
987            bloom_filter_ndv: value.clone()
988                .bloom_filter_ndv_opt
989                .map(|opt| match opt {
990                    protobuf::parquet_options::BloomFilterNdvOpt::BloomFilterNdv(v) => Some(v),
991                })
992                .unwrap_or(None),
993            allow_single_file_parallelism: value.allow_single_file_parallelism,
994            maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as usize,
995            maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as usize,
996            schema_force_view_types: value.schema_force_view_types,
997            binary_as_string: value.binary_as_string,
998            coerce_int96: value.coerce_int96_opt.clone().map(|opt| match opt {
999                protobuf::parquet_options::CoerceInt96Opt::CoerceInt96(v) => Some(v),
1000            }).unwrap_or(None),
1001            skip_arrow_metadata: value.skip_arrow_metadata,
1002        })
1003    }
1004}
1005
1006impl TryFrom<&protobuf::ParquetColumnOptions> for ParquetColumnOptions {
1007    type Error = DataFusionError;
1008    fn try_from(
1009        value: &protobuf::ParquetColumnOptions,
1010    ) -> datafusion_common::Result<Self, Self::Error> {
1011        #[allow(deprecated)] // max_statistics_size
1012        Ok(ParquetColumnOptions {
1013            compression: value.compression_opt.clone().map(|opt| match opt {
1014                protobuf::parquet_column_options::CompressionOpt::Compression(v) => Some(v),
1015            }).unwrap_or(None),
1016            dictionary_enabled: value.dictionary_enabled_opt.as_ref().map(|protobuf::parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(v)| *v),
1017            statistics_enabled: value
1018                .statistics_enabled_opt.clone()
1019                .map(|opt| match opt {
1020                    protobuf::parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(v) => Some(v),
1021                })
1022                .unwrap_or(None),
1023            encoding: value
1024                .encoding_opt.clone()
1025                .map(|opt| match opt {
1026                    protobuf::parquet_column_options::EncodingOpt::Encoding(v) => Some(v),
1027                })
1028                .unwrap_or(None),
1029            bloom_filter_enabled: value.bloom_filter_enabled_opt.map(|opt| match opt {
1030                protobuf::parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v) => Some(v),
1031            })
1032                .unwrap_or(None),
1033            bloom_filter_fpp: value
1034                .bloom_filter_fpp_opt
1035                .map(|opt| match opt {
1036                    protobuf::parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v) => Some(v),
1037                })
1038                .unwrap_or(None),
1039            bloom_filter_ndv: value
1040                .bloom_filter_ndv_opt
1041                .map(|opt| match opt {
1042                    protobuf::parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v) => Some(v),
1043                })
1044                .unwrap_or(None),
1045        })
1046    }
1047}
1048
1049impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions {
1050    type Error = DataFusionError;
1051    fn try_from(
1052        value: &protobuf::TableParquetOptions,
1053    ) -> datafusion_common::Result<Self, Self::Error> {
1054        let mut column_specific_options: HashMap<String, ParquetColumnOptions> =
1055            HashMap::new();
1056        for protobuf::ParquetColumnSpecificOptions {
1057            column_name,
1058            options: maybe_options,
1059        } in &value.column_specific_options
1060        {
1061            if let Some(options) = maybe_options {
1062                column_specific_options.insert(column_name.clone(), options.try_into()?);
1063            }
1064        }
1065        Ok(TableParquetOptions {
1066            global: value
1067                .global
1068                .as_ref()
1069                .map(|v| v.try_into())
1070                .unwrap()
1071                .unwrap(),
1072            column_specific_options,
1073            key_value_metadata: Default::default(),
1074            crypto: Default::default(),
1075        })
1076    }
1077}
1078
1079impl TryFrom<&protobuf::JsonOptions> for JsonOptions {
1080    type Error = DataFusionError;
1081
1082    fn try_from(
1083        proto_opts: &protobuf::JsonOptions,
1084    ) -> datafusion_common::Result<Self, Self::Error> {
1085        let compression: protobuf::CompressionTypeVariant = proto_opts.compression();
1086        Ok(JsonOptions {
1087            compression: compression.into(),
1088            schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize),
1089        })
1090    }
1091}
1092
1093pub fn parse_i32_to_time_unit(value: &i32) -> datafusion_common::Result<TimeUnit, Error> {
1094    protobuf::TimeUnit::try_from(*value)
1095        .map(|t| t.into())
1096        .map_err(|_| Error::unknown("TimeUnit", *value))
1097}
1098
1099pub fn parse_i32_to_interval_unit(
1100    value: &i32,
1101) -> datafusion_common::Result<IntervalUnit, Error> {
1102    protobuf::IntervalUnit::try_from(*value)
1103        .map(|t| t.into())
1104        .map_err(|_| Error::unknown("IntervalUnit", *value))
1105}
1106
1107// panic here because no better way to convert from Vec to Array
1108fn vec_to_array<T, const N: usize>(v: Vec<T>) -> [T; N] {
1109    v.try_into().unwrap_or_else(|v: Vec<T>| {
1110        panic!("Expected a Vec of length {} but it was {}", N, v.len())
1111    })
1112}
1113
1114/// Converts a vector of `protobuf::Field`s to `Arc<arrow::Field>`s.
1115pub fn parse_proto_fields_to_fields<'a, I>(
1116    fields: I,
1117) -> std::result::Result<Vec<Field>, Error>
1118where
1119    I: IntoIterator<Item = &'a protobuf::Field>,
1120{
1121    fields
1122        .into_iter()
1123        .map(Field::try_from)
1124        .collect::<datafusion_common::Result<_, _>>()
1125}
1126
1127pub(crate) fn csv_writer_options_from_proto(
1128    writer_options: &protobuf::CsvWriterOptions,
1129) -> datafusion_common::Result<WriterBuilder> {
1130    let mut builder = WriterBuilder::new();
1131    if !writer_options.delimiter.is_empty() {
1132        if let Some(delimiter) = writer_options.delimiter.chars().next() {
1133            if delimiter.is_ascii() {
1134                builder = builder.with_delimiter(delimiter as u8);
1135            } else {
1136                return Err(proto_error("CSV Delimiter is not ASCII"));
1137            }
1138        } else {
1139            return Err(proto_error("Error parsing CSV Delimiter"));
1140        }
1141    }
1142    if !writer_options.quote.is_empty() {
1143        if let Some(quote) = writer_options.quote.chars().next() {
1144            if quote.is_ascii() {
1145                builder = builder.with_quote(quote as u8);
1146            } else {
1147                return Err(proto_error("CSV Quote is not ASCII"));
1148            }
1149        } else {
1150            return Err(proto_error("Error parsing CSV Quote"));
1151        }
1152    }
1153    if !writer_options.escape.is_empty() {
1154        if let Some(escape) = writer_options.escape.chars().next() {
1155            if escape.is_ascii() {
1156                builder = builder.with_escape(escape as u8);
1157            } else {
1158                return Err(proto_error("CSV Escape is not ASCII"));
1159            }
1160        } else {
1161            return Err(proto_error("Error parsing CSV Escape"));
1162        }
1163    }
1164    Ok(builder
1165        .with_header(writer_options.has_header)
1166        .with_date_format(writer_options.date_format.clone())
1167        .with_datetime_format(writer_options.datetime_format.clone())
1168        .with_timestamp_format(writer_options.timestamp_format.clone())
1169        .with_time_format(writer_options.time_format.clone())
1170        .with_null(writer_options.null_value.clone())
1171        .with_double_quote(writer_options.double_quote))
1172}