datafusion_proto_common/from_proto/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::convert::{TryFrom, TryInto};
20use std::sync::Arc;
21
22use crate::common::proto_error;
23use crate::protobuf_common as protobuf;
24use arrow::array::{ArrayRef, AsArray};
25use arrow::buffer::Buffer;
26use arrow::csv::WriterBuilder;
27use arrow::datatypes::{
28    i256, DataType, Field, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit,
29    Schema, TimeUnit, UnionFields, UnionMode,
30};
31use arrow::ipc::{reader::read_record_batch, root_as_message};
32
33use datafusion_common::{
34    arrow_datafusion_err,
35    config::{
36        CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
37        TableParquetOptions,
38    },
39    file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
40    parsers::CompressionTypeVariant,
41    plan_datafusion_err,
42    stats::Precision,
43    Column, ColumnStatistics, Constraint, Constraints, DFSchema, DFSchemaRef,
44    DataFusionError, JoinSide, ScalarValue, Statistics, TableReference,
45};
46
47#[derive(Debug)]
48pub enum Error {
49    General(String),
50
51    DataFusionError(DataFusionError),
52
53    MissingRequiredField(String),
54
55    AtLeastOneValue(String),
56
57    UnknownEnumVariant { name: String, value: i32 },
58}
59
60impl std::fmt::Display for Error {
61    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
62        match self {
63            Self::General(desc) => write!(f, "General error: {desc}"),
64
65            Self::DataFusionError(desc) => {
66                write!(f, "DataFusion error: {desc:?}")
67            }
68
69            Self::MissingRequiredField(name) => {
70                write!(f, "Missing required field {name}")
71            }
72            Self::AtLeastOneValue(name) => {
73                write!(f, "Must have at least one {name}, found 0")
74            }
75            Self::UnknownEnumVariant { name, value } => {
76                write!(f, "Unknown i32 value for {name} enum: {value}")
77            }
78        }
79    }
80}
81
82impl std::error::Error for Error {}
83
84impl From<DataFusionError> for Error {
85    fn from(e: DataFusionError) -> Self {
86        Error::DataFusionError(e)
87    }
88}
89
90impl Error {
91    pub fn required(field: impl Into<String>) -> Error {
92        Error::MissingRequiredField(field.into())
93    }
94
95    pub fn unknown(name: impl Into<String>, value: i32) -> Error {
96        Error::UnknownEnumVariant {
97            name: name.into(),
98            value,
99        }
100    }
101}
102
103impl From<Error> for DataFusionError {
104    fn from(e: Error) -> Self {
105        plan_datafusion_err!("{}", e)
106    }
107}
108
109/// An extension trait that adds the methods `optional` and `required` to any
110/// Option containing a type implementing `TryInto<U, Error = Error>`
111pub trait FromOptionalField<T> {
112    /// Converts an optional protobuf field to an option of a different type
113    ///
114    /// Returns None if the option is None, otherwise calls [`TryInto::try_into`]
115    /// on the contained data, returning any error encountered
116    fn optional(self) -> datafusion_common::Result<Option<T>, Error>;
117
118    /// Converts an optional protobuf field to a different type, returning an error if None
119    ///
120    /// Returns `Error::MissingRequiredField` if None, otherwise calls [`TryInto::try_into`]
121    /// on the contained data, returning any error encountered
122    fn required(self, field: impl Into<String>) -> datafusion_common::Result<T, Error>;
123}
124
125impl<T, U> FromOptionalField<U> for Option<T>
126where
127    T: TryInto<U, Error = Error>,
128{
129    fn optional(self) -> datafusion_common::Result<Option<U>, Error> {
130        self.map(|t| t.try_into()).transpose()
131    }
132
133    fn required(self, field: impl Into<String>) -> datafusion_common::Result<U, Error> {
134        match self {
135            None => Err(Error::required(field)),
136            Some(t) => t.try_into(),
137        }
138    }
139}
140
141impl From<protobuf::ColumnRelation> for TableReference {
142    fn from(rel: protobuf::ColumnRelation) -> Self {
143        Self::parse_str_normalized(rel.relation.as_str(), true)
144    }
145}
146
147impl From<protobuf::Column> for Column {
148    fn from(c: protobuf::Column) -> Self {
149        let protobuf::Column { relation, name } = c;
150
151        Self::new(relation, name)
152    }
153}
154
155impl From<&protobuf::Column> for Column {
156    fn from(c: &protobuf::Column) -> Self {
157        c.clone().into()
158    }
159}
160
161impl TryFrom<&protobuf::DfSchema> for DFSchema {
162    type Error = Error;
163
164    fn try_from(
165        df_schema: &protobuf::DfSchema,
166    ) -> datafusion_common::Result<Self, Self::Error> {
167        let df_fields = df_schema.columns.clone();
168        let qualifiers_and_fields: Vec<(Option<TableReference>, Arc<Field>)> = df_fields
169            .iter()
170            .map(|df_field| {
171                let field: Field = df_field.field.as_ref().required("field")?;
172                Ok((
173                    df_field.qualifier.as_ref().map(|q| q.clone().into()),
174                    Arc::new(field),
175                ))
176            })
177            .collect::<datafusion_common::Result<Vec<_>, Error>>()?;
178
179        Ok(DFSchema::new_with_metadata(
180            qualifiers_and_fields,
181            df_schema.metadata.clone(),
182        )?)
183    }
184}
185
186impl TryFrom<protobuf::DfSchema> for DFSchemaRef {
187    type Error = Error;
188
189    fn try_from(
190        df_schema: protobuf::DfSchema,
191    ) -> datafusion_common::Result<Self, Self::Error> {
192        let dfschema: DFSchema = (&df_schema).try_into()?;
193        Ok(Arc::new(dfschema))
194    }
195}
196
197impl TryFrom<&protobuf::ArrowType> for DataType {
198    type Error = Error;
199
200    fn try_from(
201        arrow_type: &protobuf::ArrowType,
202    ) -> datafusion_common::Result<Self, Self::Error> {
203        arrow_type
204            .arrow_type_enum
205            .as_ref()
206            .required("arrow_type_enum")
207    }
208}
209
210impl TryFrom<&protobuf::arrow_type::ArrowTypeEnum> for DataType {
211    type Error = Error;
212    fn try_from(
213        arrow_type_enum: &protobuf::arrow_type::ArrowTypeEnum,
214    ) -> datafusion_common::Result<Self, Self::Error> {
215        use protobuf::arrow_type;
216        Ok(match arrow_type_enum {
217            arrow_type::ArrowTypeEnum::None(_) => DataType::Null,
218            arrow_type::ArrowTypeEnum::Bool(_) => DataType::Boolean,
219            arrow_type::ArrowTypeEnum::Uint8(_) => DataType::UInt8,
220            arrow_type::ArrowTypeEnum::Int8(_) => DataType::Int8,
221            arrow_type::ArrowTypeEnum::Uint16(_) => DataType::UInt16,
222            arrow_type::ArrowTypeEnum::Int16(_) => DataType::Int16,
223            arrow_type::ArrowTypeEnum::Uint32(_) => DataType::UInt32,
224            arrow_type::ArrowTypeEnum::Int32(_) => DataType::Int32,
225            arrow_type::ArrowTypeEnum::Uint64(_) => DataType::UInt64,
226            arrow_type::ArrowTypeEnum::Int64(_) => DataType::Int64,
227            arrow_type::ArrowTypeEnum::Float16(_) => DataType::Float16,
228            arrow_type::ArrowTypeEnum::Float32(_) => DataType::Float32,
229            arrow_type::ArrowTypeEnum::Float64(_) => DataType::Float64,
230            arrow_type::ArrowTypeEnum::Utf8(_) => DataType::Utf8,
231            arrow_type::ArrowTypeEnum::Utf8View(_) => DataType::Utf8View,
232            arrow_type::ArrowTypeEnum::LargeUtf8(_) => DataType::LargeUtf8,
233            arrow_type::ArrowTypeEnum::Binary(_) => DataType::Binary,
234            arrow_type::ArrowTypeEnum::BinaryView(_) => DataType::BinaryView,
235            arrow_type::ArrowTypeEnum::FixedSizeBinary(size) => {
236                DataType::FixedSizeBinary(*size)
237            }
238            arrow_type::ArrowTypeEnum::LargeBinary(_) => DataType::LargeBinary,
239            arrow_type::ArrowTypeEnum::Date32(_) => DataType::Date32,
240            arrow_type::ArrowTypeEnum::Date64(_) => DataType::Date64,
241            arrow_type::ArrowTypeEnum::Duration(time_unit) => {
242                DataType::Duration(parse_i32_to_time_unit(time_unit)?)
243            }
244            arrow_type::ArrowTypeEnum::Timestamp(protobuf::Timestamp {
245                time_unit,
246                timezone,
247            }) => DataType::Timestamp(
248                parse_i32_to_time_unit(time_unit)?,
249                match timezone.len() {
250                    0 => None,
251                    _ => Some(timezone.as_str().into()),
252                },
253            ),
254            arrow_type::ArrowTypeEnum::Time32(time_unit) => {
255                DataType::Time32(parse_i32_to_time_unit(time_unit)?)
256            }
257            arrow_type::ArrowTypeEnum::Time64(time_unit) => {
258                DataType::Time64(parse_i32_to_time_unit(time_unit)?)
259            }
260            arrow_type::ArrowTypeEnum::Interval(interval_unit) => {
261                DataType::Interval(parse_i32_to_interval_unit(interval_unit)?)
262            }
263            arrow_type::ArrowTypeEnum::Decimal32(protobuf::Decimal32Type {
264                precision,
265                scale,
266            }) => DataType::Decimal32(*precision as u8, *scale as i8),
267            arrow_type::ArrowTypeEnum::Decimal64(protobuf::Decimal64Type {
268                precision,
269                scale,
270            }) => DataType::Decimal64(*precision as u8, *scale as i8),
271            arrow_type::ArrowTypeEnum::Decimal128(protobuf::Decimal128Type {
272                precision,
273                scale,
274            }) => DataType::Decimal128(*precision as u8, *scale as i8),
275            arrow_type::ArrowTypeEnum::Decimal256(protobuf::Decimal256Type {
276                precision,
277                scale,
278            }) => DataType::Decimal256(*precision as u8, *scale as i8),
279            arrow_type::ArrowTypeEnum::List(list) => {
280                let list_type =
281                    list.as_ref().field_type.as_deref().required("field_type")?;
282                DataType::List(Arc::new(list_type))
283            }
284            arrow_type::ArrowTypeEnum::LargeList(list) => {
285                let list_type =
286                    list.as_ref().field_type.as_deref().required("field_type")?;
287                DataType::LargeList(Arc::new(list_type))
288            }
289            arrow_type::ArrowTypeEnum::FixedSizeList(list) => {
290                let list_type =
291                    list.as_ref().field_type.as_deref().required("field_type")?;
292                let list_size = list.list_size;
293                DataType::FixedSizeList(Arc::new(list_type), list_size)
294            }
295            arrow_type::ArrowTypeEnum::Struct(strct) => DataType::Struct(
296                parse_proto_fields_to_fields(&strct.sub_field_types)?.into(),
297            ),
298            arrow_type::ArrowTypeEnum::Union(union) => {
299                let union_mode = protobuf::UnionMode::try_from(union.union_mode)
300                    .map_err(|_| Error::unknown("UnionMode", union.union_mode))?;
301                let union_mode = match union_mode {
302                    protobuf::UnionMode::Dense => UnionMode::Dense,
303                    protobuf::UnionMode::Sparse => UnionMode::Sparse,
304                };
305                let union_fields = parse_proto_fields_to_fields(&union.union_types)?;
306
307                // Default to index based type ids if not provided
308                let type_ids: Vec<_> = match union.type_ids.is_empty() {
309                    true => (0..union_fields.len() as i8).collect(),
310                    false => union.type_ids.iter().map(|i| *i as i8).collect(),
311                };
312
313                DataType::Union(UnionFields::new(type_ids, union_fields), union_mode)
314            }
315            arrow_type::ArrowTypeEnum::Dictionary(dict) => {
316                let key_datatype = dict.as_ref().key.as_deref().required("key")?;
317                let value_datatype = dict.as_ref().value.as_deref().required("value")?;
318                DataType::Dictionary(Box::new(key_datatype), Box::new(value_datatype))
319            }
320            arrow_type::ArrowTypeEnum::Map(map) => {
321                let field: Field =
322                    map.as_ref().field_type.as_deref().required("field_type")?;
323                let keys_sorted = map.keys_sorted;
324                DataType::Map(Arc::new(field), keys_sorted)
325            }
326        })
327    }
328}
329
330impl TryFrom<&protobuf::Field> for Field {
331    type Error = Error;
332    fn try_from(field: &protobuf::Field) -> Result<Self, Self::Error> {
333        let datatype = field.arrow_type.as_deref().required("arrow_type")?;
334        let field = Self::new(field.name.as_str(), datatype, field.nullable)
335            .with_metadata(field.metadata.clone());
336        Ok(field)
337    }
338}
339
340impl TryFrom<&protobuf::Schema> for Schema {
341    type Error = Error;
342
343    fn try_from(
344        schema: &protobuf::Schema,
345    ) -> datafusion_common::Result<Self, Self::Error> {
346        let fields = schema
347            .columns
348            .iter()
349            .map(Field::try_from)
350            .collect::<datafusion_common::Result<Vec<_>, _>>()?;
351        Ok(Self::new_with_metadata(fields, schema.metadata.clone()))
352    }
353}
354
355impl TryFrom<&protobuf::ScalarValue> for ScalarValue {
356    type Error = Error;
357
358    fn try_from(
359        scalar: &protobuf::ScalarValue,
360    ) -> datafusion_common::Result<Self, Self::Error> {
361        use protobuf::scalar_value::Value;
362
363        let value = scalar
364            .value
365            .as_ref()
366            .ok_or_else(|| Error::required("value"))?;
367
368        Ok(match value {
369            Value::BoolValue(v) => Self::Boolean(Some(*v)),
370            Value::Utf8Value(v) => Self::Utf8(Some(v.to_owned())),
371            Value::Utf8ViewValue(v) => Self::Utf8View(Some(v.to_owned())),
372            Value::LargeUtf8Value(v) => Self::LargeUtf8(Some(v.to_owned())),
373            Value::Int8Value(v) => Self::Int8(Some(*v as i8)),
374            Value::Int16Value(v) => Self::Int16(Some(*v as i16)),
375            Value::Int32Value(v) => Self::Int32(Some(*v)),
376            Value::Int64Value(v) => Self::Int64(Some(*v)),
377            Value::Uint8Value(v) => Self::UInt8(Some(*v as u8)),
378            Value::Uint16Value(v) => Self::UInt16(Some(*v as u16)),
379            Value::Uint32Value(v) => Self::UInt32(Some(*v)),
380            Value::Uint64Value(v) => Self::UInt64(Some(*v)),
381            Value::Float32Value(v) => Self::Float32(Some(*v)),
382            Value::Float64Value(v) => Self::Float64(Some(*v)),
383            Value::Date32Value(v) => Self::Date32(Some(*v)),
384            // ScalarValue::List is serialized using arrow IPC format
385            Value::ListValue(v)
386            | Value::FixedSizeListValue(v)
387            | Value::LargeListValue(v)
388            | Value::StructValue(v)
389            | Value::MapValue(v) => {
390                let protobuf::ScalarNestedValue {
391                    ipc_message,
392                    arrow_data,
393                    dictionaries,
394                    schema,
395                } = &v;
396
397                let schema: Schema = if let Some(schema_ref) = schema {
398                    schema_ref.try_into()?
399                } else {
400                    return Err(Error::General(
401                        "Invalid schema while deserializing ScalarValue::List"
402                            .to_string(),
403                    ));
404                };
405
406                let message = root_as_message(ipc_message.as_slice()).map_err(|e| {
407                    Error::General(format!(
408                        "Error IPC message while deserializing ScalarValue::List: {e}"
409                    ))
410                })?;
411                let buffer = Buffer::from(arrow_data.as_slice());
412
413                let ipc_batch = message.header_as_record_batch().ok_or_else(|| {
414                    Error::General(
415                        "Unexpected message type deserializing ScalarValue::List"
416                            .to_string(),
417                    )
418                })?;
419
420                let dict_by_id: HashMap<i64,ArrayRef> = dictionaries.iter().map(|protobuf::scalar_nested_value::Dictionary { ipc_message, arrow_data }| {
421                    let message = root_as_message(ipc_message.as_slice()).map_err(|e| {
422                        Error::General(format!(
423                            "Error IPC message while deserializing ScalarValue::List dictionary message: {e}"
424                        ))
425                    })?;
426                    let buffer = Buffer::from(arrow_data.as_slice());
427
428                    let dict_batch = message.header_as_dictionary_batch().ok_or_else(|| {
429                        Error::General(
430                            "Unexpected message type deserializing ScalarValue::List dictionary message"
431                                .to_string(),
432                        )
433                    })?;
434
435                    let id = dict_batch.id();
436
437                    let record_batch = read_record_batch(
438                        &buffer,
439                        dict_batch.data().unwrap(),
440                        Arc::new(schema.clone()),
441                        &Default::default(),
442                        None,
443                        &message.version(),
444                    )?;
445
446                    let values: ArrayRef = Arc::clone(record_batch.column(0));
447
448                    Ok((id, values))
449                }).collect::<datafusion_common::Result<HashMap<_, _>>>()?;
450
451                let record_batch = read_record_batch(
452                    &buffer,
453                    ipc_batch,
454                    Arc::new(schema),
455                    &dict_by_id,
456                    None,
457                    &message.version(),
458                )
459                .map_err(|e| arrow_datafusion_err!(e))
460                .map_err(|e| e.context("Decoding ScalarValue::List Value"))?;
461                let arr = record_batch.column(0);
462                match value {
463                    Value::ListValue(_) => {
464                        Self::List(arr.as_list::<i32>().to_owned().into())
465                    }
466                    Value::LargeListValue(_) => {
467                        Self::LargeList(arr.as_list::<i64>().to_owned().into())
468                    }
469                    Value::FixedSizeListValue(_) => {
470                        Self::FixedSizeList(arr.as_fixed_size_list().to_owned().into())
471                    }
472                    Value::StructValue(_) => {
473                        Self::Struct(arr.as_struct().to_owned().into())
474                    }
475                    Value::MapValue(_) => Self::Map(arr.as_map().to_owned().into()),
476                    _ => unreachable!(),
477                }
478            }
479            Value::NullValue(v) => {
480                let null_type: DataType = v.try_into()?;
481                null_type.try_into().map_err(Error::DataFusionError)?
482            }
483            Value::Decimal32Value(val) => {
484                let array = vec_to_array(val.value.clone());
485                Self::Decimal32(Some(i32::from_be_bytes(array)), val.p as u8, val.s as i8)
486            }
487            Value::Decimal64Value(val) => {
488                let array = vec_to_array(val.value.clone());
489                Self::Decimal64(Some(i64::from_be_bytes(array)), val.p as u8, val.s as i8)
490            }
491            Value::Decimal128Value(val) => {
492                let array = vec_to_array(val.value.clone());
493                Self::Decimal128(
494                    Some(i128::from_be_bytes(array)),
495                    val.p as u8,
496                    val.s as i8,
497                )
498            }
499            Value::Decimal256Value(val) => {
500                let array = vec_to_array(val.value.clone());
501                Self::Decimal256(
502                    Some(i256::from_be_bytes(array)),
503                    val.p as u8,
504                    val.s as i8,
505                )
506            }
507            Value::Date64Value(v) => Self::Date64(Some(*v)),
508            Value::Time32Value(v) => {
509                let time_value =
510                    v.value.as_ref().ok_or_else(|| Error::required("value"))?;
511                match time_value {
512                    protobuf::scalar_time32_value::Value::Time32SecondValue(t) => {
513                        Self::Time32Second(Some(*t))
514                    }
515                    protobuf::scalar_time32_value::Value::Time32MillisecondValue(t) => {
516                        Self::Time32Millisecond(Some(*t))
517                    }
518                }
519            }
520            Value::Time64Value(v) => {
521                let time_value =
522                    v.value.as_ref().ok_or_else(|| Error::required("value"))?;
523                match time_value {
524                    protobuf::scalar_time64_value::Value::Time64MicrosecondValue(t) => {
525                        Self::Time64Microsecond(Some(*t))
526                    }
527                    protobuf::scalar_time64_value::Value::Time64NanosecondValue(t) => {
528                        Self::Time64Nanosecond(Some(*t))
529                    }
530                }
531            }
532            Value::IntervalYearmonthValue(v) => Self::IntervalYearMonth(Some(*v)),
533            Value::DurationSecondValue(v) => Self::DurationSecond(Some(*v)),
534            Value::DurationMillisecondValue(v) => Self::DurationMillisecond(Some(*v)),
535            Value::DurationMicrosecondValue(v) => Self::DurationMicrosecond(Some(*v)),
536            Value::DurationNanosecondValue(v) => Self::DurationNanosecond(Some(*v)),
537            Value::TimestampValue(v) => {
538                let timezone = if v.timezone.is_empty() {
539                    None
540                } else {
541                    Some(v.timezone.as_str().into())
542                };
543
544                let ts_value =
545                    v.value.as_ref().ok_or_else(|| Error::required("value"))?;
546
547                match ts_value {
548                    protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(t) => {
549                        Self::TimestampMicrosecond(Some(*t), timezone)
550                    }
551                    protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(t) => {
552                        Self::TimestampNanosecond(Some(*t), timezone)
553                    }
554                    protobuf::scalar_timestamp_value::Value::TimeSecondValue(t) => {
555                        Self::TimestampSecond(Some(*t), timezone)
556                    }
557                    protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(t) => {
558                        Self::TimestampMillisecond(Some(*t), timezone)
559                    }
560                }
561            }
562            Value::DictionaryValue(v) => {
563                let index_type: DataType = v
564                    .index_type
565                    .as_ref()
566                    .ok_or_else(|| Error::required("index_type"))?
567                    .try_into()?;
568
569                let value: Self = v
570                    .value
571                    .as_ref()
572                    .ok_or_else(|| Error::required("value"))?
573                    .as_ref()
574                    .try_into()?;
575
576                Self::Dictionary(Box::new(index_type), Box::new(value))
577            }
578            Value::BinaryValue(v) => Self::Binary(Some(v.clone())),
579            Value::BinaryViewValue(v) => Self::BinaryView(Some(v.clone())),
580            Value::LargeBinaryValue(v) => Self::LargeBinary(Some(v.clone())),
581            Value::IntervalDaytimeValue(v) => Self::IntervalDayTime(Some(
582                IntervalDayTimeType::make_value(v.days, v.milliseconds),
583            )),
584            Value::IntervalMonthDayNano(v) => Self::IntervalMonthDayNano(Some(
585                IntervalMonthDayNanoType::make_value(v.months, v.days, v.nanos),
586            )),
587            Value::UnionValue(val) => {
588                let mode = match val.mode {
589                    0 => UnionMode::Sparse,
590                    1 => UnionMode::Dense,
591                    id => Err(Error::unknown("UnionMode", id))?,
592                };
593                let ids = val
594                    .fields
595                    .iter()
596                    .map(|f| f.field_id as i8)
597                    .collect::<Vec<_>>();
598                let fields = val
599                    .fields
600                    .iter()
601                    .map(|f| f.field.clone())
602                    .collect::<Option<Vec<_>>>();
603                let fields = fields.ok_or_else(|| Error::required("UnionField"))?;
604                let fields = parse_proto_fields_to_fields(&fields)?;
605                let fields = UnionFields::new(ids, fields);
606                let v_id = val.value_id as i8;
607                let val = match &val.value {
608                    None => None,
609                    Some(val) => {
610                        let val: ScalarValue = val
611                            .as_ref()
612                            .try_into()
613                            .map_err(|_| Error::General("Invalid Scalar".to_string()))?;
614                        Some((v_id, Box::new(val)))
615                    }
616                };
617                Self::Union(val, fields, mode)
618            }
619            Value::FixedSizeBinaryValue(v) => {
620                Self::FixedSizeBinary(v.length, Some(v.clone().values))
621            }
622        })
623    }
624}
625
626impl From<protobuf::TimeUnit> for TimeUnit {
627    fn from(time_unit: protobuf::TimeUnit) -> Self {
628        match time_unit {
629            protobuf::TimeUnit::Second => TimeUnit::Second,
630            protobuf::TimeUnit::Millisecond => TimeUnit::Millisecond,
631            protobuf::TimeUnit::Microsecond => TimeUnit::Microsecond,
632            protobuf::TimeUnit::Nanosecond => TimeUnit::Nanosecond,
633        }
634    }
635}
636
637impl From<protobuf::IntervalUnit> for IntervalUnit {
638    fn from(interval_unit: protobuf::IntervalUnit) -> Self {
639        match interval_unit {
640            protobuf::IntervalUnit::YearMonth => IntervalUnit::YearMonth,
641            protobuf::IntervalUnit::DayTime => IntervalUnit::DayTime,
642            protobuf::IntervalUnit::MonthDayNano => IntervalUnit::MonthDayNano,
643        }
644    }
645}
646
647impl From<protobuf::Constraints> for Constraints {
648    fn from(constraints: protobuf::Constraints) -> Self {
649        Constraints::new_unverified(
650            constraints
651                .constraints
652                .into_iter()
653                .map(|item| item.into())
654                .collect(),
655        )
656    }
657}
658
659impl From<protobuf::Constraint> for Constraint {
660    fn from(value: protobuf::Constraint) -> Self {
661        match value.constraint_mode.unwrap() {
662            protobuf::constraint::ConstraintMode::PrimaryKey(elem) => {
663                Constraint::PrimaryKey(
664                    elem.indices.into_iter().map(|item| item as usize).collect(),
665                )
666            }
667            protobuf::constraint::ConstraintMode::Unique(elem) => Constraint::Unique(
668                elem.indices.into_iter().map(|item| item as usize).collect(),
669            ),
670        }
671    }
672}
673
674impl From<&protobuf::ColumnStats> for ColumnStatistics {
675    fn from(cs: &protobuf::ColumnStats) -> ColumnStatistics {
676        ColumnStatistics {
677            null_count: if let Some(nc) = &cs.null_count {
678                nc.clone().into()
679            } else {
680                Precision::Absent
681            },
682            max_value: if let Some(max) = &cs.max_value {
683                max.clone().into()
684            } else {
685                Precision::Absent
686            },
687            min_value: if let Some(min) = &cs.min_value {
688                min.clone().into()
689            } else {
690                Precision::Absent
691            },
692            sum_value: if let Some(sum) = &cs.sum_value {
693                sum.clone().into()
694            } else {
695                Precision::Absent
696            },
697            distinct_count: if let Some(dc) = &cs.distinct_count {
698                dc.clone().into()
699            } else {
700                Precision::Absent
701            },
702        }
703    }
704}
705
706impl From<protobuf::Precision> for Precision<usize> {
707    fn from(s: protobuf::Precision) -> Self {
708        let Ok(precision_type) = s.precision_info.try_into() else {
709            return Precision::Absent;
710        };
711        match precision_type {
712            protobuf::PrecisionInfo::Exact => {
713                if let Some(val) = s.val {
714                    if let Ok(ScalarValue::UInt64(Some(val))) =
715                        ScalarValue::try_from(&val)
716                    {
717                        Precision::Exact(val as usize)
718                    } else {
719                        Precision::Absent
720                    }
721                } else {
722                    Precision::Absent
723                }
724            }
725            protobuf::PrecisionInfo::Inexact => {
726                if let Some(val) = s.val {
727                    if let Ok(ScalarValue::UInt64(Some(val))) =
728                        ScalarValue::try_from(&val)
729                    {
730                        Precision::Inexact(val as usize)
731                    } else {
732                        Precision::Absent
733                    }
734                } else {
735                    Precision::Absent
736                }
737            }
738            protobuf::PrecisionInfo::Absent => Precision::Absent,
739        }
740    }
741}
742
743impl From<protobuf::Precision> for Precision<ScalarValue> {
744    fn from(s: protobuf::Precision) -> Self {
745        let Ok(precision_type) = s.precision_info.try_into() else {
746            return Precision::Absent;
747        };
748        match precision_type {
749            protobuf::PrecisionInfo::Exact => {
750                if let Some(val) = s.val {
751                    if let Ok(val) = ScalarValue::try_from(&val) {
752                        Precision::Exact(val)
753                    } else {
754                        Precision::Absent
755                    }
756                } else {
757                    Precision::Absent
758                }
759            }
760            protobuf::PrecisionInfo::Inexact => {
761                if let Some(val) = s.val {
762                    if let Ok(val) = ScalarValue::try_from(&val) {
763                        Precision::Inexact(val)
764                    } else {
765                        Precision::Absent
766                    }
767                } else {
768                    Precision::Absent
769                }
770            }
771            protobuf::PrecisionInfo::Absent => Precision::Absent,
772        }
773    }
774}
775
776impl From<protobuf::JoinSide> for JoinSide {
777    fn from(t: protobuf::JoinSide) -> Self {
778        match t {
779            protobuf::JoinSide::LeftSide => JoinSide::Left,
780            protobuf::JoinSide::RightSide => JoinSide::Right,
781            protobuf::JoinSide::None => JoinSide::None,
782        }
783    }
784}
785
786impl From<&protobuf::Constraint> for Constraint {
787    fn from(value: &protobuf::Constraint) -> Self {
788        match &value.constraint_mode {
789            Some(protobuf::constraint::ConstraintMode::PrimaryKey(elem)) => {
790                Constraint::PrimaryKey(
791                    elem.indices.iter().map(|&item| item as usize).collect(),
792                )
793            }
794            Some(protobuf::constraint::ConstraintMode::Unique(elem)) => {
795                Constraint::Unique(
796                    elem.indices.iter().map(|&item| item as usize).collect(),
797                )
798            }
799            None => panic!("constraint_mode not set"),
800        }
801    }
802}
803
804impl TryFrom<&protobuf::Constraints> for Constraints {
805    type Error = DataFusionError;
806
807    fn try_from(
808        constraints: &protobuf::Constraints,
809    ) -> datafusion_common::Result<Self, Self::Error> {
810        Ok(Constraints::new_unverified(
811            constraints
812                .constraints
813                .iter()
814                .map(|item| item.into())
815                .collect(),
816        ))
817    }
818}
819
820impl TryFrom<&protobuf::Statistics> for Statistics {
821    type Error = DataFusionError;
822
823    fn try_from(
824        s: &protobuf::Statistics,
825    ) -> datafusion_common::Result<Self, Self::Error> {
826        // Keep it sync with Statistics::to_proto
827        Ok(Statistics {
828            num_rows: if let Some(nr) = &s.num_rows {
829                nr.clone().into()
830            } else {
831                Precision::Absent
832            },
833            total_byte_size: if let Some(tbs) = &s.total_byte_size {
834                tbs.clone().into()
835            } else {
836                Precision::Absent
837            },
838            // No column statistic (None) is encoded with empty array
839            column_statistics: s.column_stats.iter().map(|s| s.into()).collect(),
840        })
841    }
842}
843
844impl From<protobuf::CompressionTypeVariant> for CompressionTypeVariant {
845    fn from(value: protobuf::CompressionTypeVariant) -> Self {
846        match value {
847            protobuf::CompressionTypeVariant::Gzip => Self::GZIP,
848            protobuf::CompressionTypeVariant::Bzip2 => Self::BZIP2,
849            protobuf::CompressionTypeVariant::Xz => Self::XZ,
850            protobuf::CompressionTypeVariant::Zstd => Self::ZSTD,
851            protobuf::CompressionTypeVariant::Uncompressed => Self::UNCOMPRESSED,
852        }
853    }
854}
855
856impl From<CompressionTypeVariant> for protobuf::CompressionTypeVariant {
857    fn from(value: CompressionTypeVariant) -> Self {
858        match value {
859            CompressionTypeVariant::GZIP => Self::Gzip,
860            CompressionTypeVariant::BZIP2 => Self::Bzip2,
861            CompressionTypeVariant::XZ => Self::Xz,
862            CompressionTypeVariant::ZSTD => Self::Zstd,
863            CompressionTypeVariant::UNCOMPRESSED => Self::Uncompressed,
864        }
865    }
866}
867
868impl TryFrom<&protobuf::CsvWriterOptions> for CsvWriterOptions {
869    type Error = DataFusionError;
870
871    fn try_from(
872        opts: &protobuf::CsvWriterOptions,
873    ) -> datafusion_common::Result<Self, Self::Error> {
874        let write_options = csv_writer_options_from_proto(opts)?;
875        let compression: CompressionTypeVariant = opts.compression().into();
876        Ok(CsvWriterOptions::new(write_options, compression))
877    }
878}
879
880impl TryFrom<&protobuf::JsonWriterOptions> for JsonWriterOptions {
881    type Error = DataFusionError;
882
883    fn try_from(
884        opts: &protobuf::JsonWriterOptions,
885    ) -> datafusion_common::Result<Self, Self::Error> {
886        let compression: CompressionTypeVariant = opts.compression().into();
887        Ok(JsonWriterOptions::new(compression))
888    }
889}
890
891impl TryFrom<&protobuf::CsvOptions> for CsvOptions {
892    type Error = DataFusionError;
893
894    fn try_from(
895        proto_opts: &protobuf::CsvOptions,
896    ) -> datafusion_common::Result<Self, Self::Error> {
897        Ok(CsvOptions {
898            has_header: proto_opts.has_header.first().map(|h| *h != 0),
899            delimiter: proto_opts.delimiter[0],
900            quote: proto_opts.quote[0],
901            terminator: proto_opts.terminator.first().copied(),
902            escape: proto_opts.escape.first().copied(),
903            double_quote: proto_opts.has_header.first().map(|h| *h != 0),
904            newlines_in_values: proto_opts.newlines_in_values.first().map(|h| *h != 0),
905            compression: proto_opts.compression().into(),
906            schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize),
907            date_format: (!proto_opts.date_format.is_empty())
908                .then(|| proto_opts.date_format.clone()),
909            datetime_format: (!proto_opts.datetime_format.is_empty())
910                .then(|| proto_opts.datetime_format.clone()),
911            timestamp_format: (!proto_opts.timestamp_format.is_empty())
912                .then(|| proto_opts.timestamp_format.clone()),
913            timestamp_tz_format: (!proto_opts.timestamp_tz_format.is_empty())
914                .then(|| proto_opts.timestamp_tz_format.clone()),
915            time_format: (!proto_opts.time_format.is_empty())
916                .then(|| proto_opts.time_format.clone()),
917            null_value: (!proto_opts.null_value.is_empty())
918                .then(|| proto_opts.null_value.clone()),
919            null_regex: (!proto_opts.null_regex.is_empty())
920                .then(|| proto_opts.null_regex.clone()),
921            comment: proto_opts.comment.first().copied(),
922            truncated_rows: proto_opts.truncated_rows.first().map(|h| *h != 0),
923        })
924    }
925}
926
927impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
928    type Error = DataFusionError;
929
930    fn try_from(
931        value: &protobuf::ParquetOptions,
932    ) -> datafusion_common::Result<Self, Self::Error> {
933        #[allow(deprecated)] // max_statistics_size
934        Ok(ParquetOptions {
935            enable_page_index: value.enable_page_index,
936            pruning: value.pruning,
937            skip_metadata: value.skip_metadata,
938            metadata_size_hint: value
939                .metadata_size_hint_opt
940                .map(|opt| match opt {
941                    protobuf::parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v) => Some(v as usize),
942                })
943                .unwrap_or(None),
944            pushdown_filters: value.pushdown_filters,
945            reorder_filters: value.reorder_filters,
946            data_pagesize_limit: value.data_pagesize_limit as usize,
947            write_batch_size: value.write_batch_size as usize,
948            writer_version: value.writer_version.clone(),
949            compression: value.compression_opt.clone().map(|opt| match opt {
950                protobuf::parquet_options::CompressionOpt::Compression(v) => Some(v),
951            }).unwrap_or(None),
952            dictionary_enabled: value.dictionary_enabled_opt.as_ref().map(|protobuf::parquet_options::DictionaryEnabledOpt::DictionaryEnabled(v)| *v),
953            // Continuing from where we left off in the TryFrom implementation
954            dictionary_page_size_limit: value.dictionary_page_size_limit as usize,
955            statistics_enabled: value
956                .statistics_enabled_opt.clone()
957                .map(|opt| match opt {
958                    protobuf::parquet_options::StatisticsEnabledOpt::StatisticsEnabled(v) => Some(v),
959                })
960                .unwrap_or(None),
961            max_row_group_size: value.max_row_group_size as usize,
962            created_by: value.created_by.clone(),
963            column_index_truncate_length: value
964                .column_index_truncate_length_opt.as_ref()
965                .map(|opt| match opt {
966                    protobuf::parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v) => Some(*v as usize),
967                })
968                .unwrap_or(None),
969            statistics_truncate_length: value
970                .statistics_truncate_length_opt.as_ref()
971                .map(|opt| match opt {
972                    protobuf::parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v) => Some(*v as usize),
973                })
974                .unwrap_or(None),
975            data_page_row_count_limit: value.data_page_row_count_limit as usize,
976            encoding: value
977                .encoding_opt.clone()
978                .map(|opt| match opt {
979                    protobuf::parquet_options::EncodingOpt::Encoding(v) => Some(v),
980                })
981                .unwrap_or(None),
982            bloom_filter_on_read: value.bloom_filter_on_read,
983            bloom_filter_on_write: value.bloom_filter_on_write,
984            bloom_filter_fpp: value.clone()
985                .bloom_filter_fpp_opt
986                .map(|opt| match opt {
987                    protobuf::parquet_options::BloomFilterFppOpt::BloomFilterFpp(v) => Some(v),
988                })
989                .unwrap_or(None),
990            bloom_filter_ndv: value.clone()
991                .bloom_filter_ndv_opt
992                .map(|opt| match opt {
993                    protobuf::parquet_options::BloomFilterNdvOpt::BloomFilterNdv(v) => Some(v),
994                })
995                .unwrap_or(None),
996            allow_single_file_parallelism: value.allow_single_file_parallelism,
997            maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as usize,
998            maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as usize,
999            schema_force_view_types: value.schema_force_view_types,
1000            binary_as_string: value.binary_as_string,
1001            coerce_int96: value.coerce_int96_opt.clone().map(|opt| match opt {
1002                protobuf::parquet_options::CoerceInt96Opt::CoerceInt96(v) => Some(v),
1003            }).unwrap_or(None),
1004            skip_arrow_metadata: value.skip_arrow_metadata,
1005            max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt {
1006                protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize),
1007            }).unwrap_or(None),
1008        })
1009    }
1010}
1011
1012impl TryFrom<&protobuf::ParquetColumnOptions> for ParquetColumnOptions {
1013    type Error = DataFusionError;
1014    fn try_from(
1015        value: &protobuf::ParquetColumnOptions,
1016    ) -> datafusion_common::Result<Self, Self::Error> {
1017        #[allow(deprecated)] // max_statistics_size
1018        Ok(ParquetColumnOptions {
1019            compression: value.compression_opt.clone().map(|opt| match opt {
1020                protobuf::parquet_column_options::CompressionOpt::Compression(v) => Some(v),
1021            }).unwrap_or(None),
1022            dictionary_enabled: value.dictionary_enabled_opt.as_ref().map(|protobuf::parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(v)| *v),
1023            statistics_enabled: value
1024                .statistics_enabled_opt.clone()
1025                .map(|opt| match opt {
1026                    protobuf::parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(v) => Some(v),
1027                })
1028                .unwrap_or(None),
1029            encoding: value
1030                .encoding_opt.clone()
1031                .map(|opt| match opt {
1032                    protobuf::parquet_column_options::EncodingOpt::Encoding(v) => Some(v),
1033                })
1034                .unwrap_or(None),
1035            bloom_filter_enabled: value.bloom_filter_enabled_opt.map(|opt| match opt {
1036                protobuf::parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v) => Some(v),
1037            })
1038                .unwrap_or(None),
1039            bloom_filter_fpp: value
1040                .bloom_filter_fpp_opt
1041                .map(|opt| match opt {
1042                    protobuf::parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v) => Some(v),
1043                })
1044                .unwrap_or(None),
1045            bloom_filter_ndv: value
1046                .bloom_filter_ndv_opt
1047                .map(|opt| match opt {
1048                    protobuf::parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v) => Some(v),
1049                })
1050                .unwrap_or(None),
1051        })
1052    }
1053}
1054
1055impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions {
1056    type Error = DataFusionError;
1057    fn try_from(
1058        value: &protobuf::TableParquetOptions,
1059    ) -> datafusion_common::Result<Self, Self::Error> {
1060        let mut column_specific_options: HashMap<String, ParquetColumnOptions> =
1061            HashMap::new();
1062        for protobuf::ParquetColumnSpecificOptions {
1063            column_name,
1064            options: maybe_options,
1065        } in &value.column_specific_options
1066        {
1067            if let Some(options) = maybe_options {
1068                column_specific_options.insert(column_name.clone(), options.try_into()?);
1069            }
1070        }
1071        Ok(TableParquetOptions {
1072            global: value
1073                .global
1074                .as_ref()
1075                .map(|v| v.try_into())
1076                .unwrap()
1077                .unwrap(),
1078            column_specific_options,
1079            key_value_metadata: Default::default(),
1080            crypto: Default::default(),
1081        })
1082    }
1083}
1084
1085impl TryFrom<&protobuf::JsonOptions> for JsonOptions {
1086    type Error = DataFusionError;
1087
1088    fn try_from(
1089        proto_opts: &protobuf::JsonOptions,
1090    ) -> datafusion_common::Result<Self, Self::Error> {
1091        let compression: protobuf::CompressionTypeVariant = proto_opts.compression();
1092        Ok(JsonOptions {
1093            compression: compression.into(),
1094            schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize),
1095        })
1096    }
1097}
1098
1099pub fn parse_i32_to_time_unit(value: &i32) -> datafusion_common::Result<TimeUnit, Error> {
1100    protobuf::TimeUnit::try_from(*value)
1101        .map(|t| t.into())
1102        .map_err(|_| Error::unknown("TimeUnit", *value))
1103}
1104
1105pub fn parse_i32_to_interval_unit(
1106    value: &i32,
1107) -> datafusion_common::Result<IntervalUnit, Error> {
1108    protobuf::IntervalUnit::try_from(*value)
1109        .map(|t| t.into())
1110        .map_err(|_| Error::unknown("IntervalUnit", *value))
1111}
1112
1113// panic here because no better way to convert from Vec to Array
1114fn vec_to_array<T, const N: usize>(v: Vec<T>) -> [T; N] {
1115    v.try_into().unwrap_or_else(|v: Vec<T>| {
1116        panic!("Expected a Vec of length {} but it was {}", N, v.len())
1117    })
1118}
1119
1120/// Converts a vector of `protobuf::Field`s to `Arc<arrow::Field>`s.
1121pub fn parse_proto_fields_to_fields<'a, I>(
1122    fields: I,
1123) -> std::result::Result<Vec<Field>, Error>
1124where
1125    I: IntoIterator<Item = &'a protobuf::Field>,
1126{
1127    fields
1128        .into_iter()
1129        .map(Field::try_from)
1130        .collect::<datafusion_common::Result<_, _>>()
1131}
1132
1133pub(crate) fn csv_writer_options_from_proto(
1134    writer_options: &protobuf::CsvWriterOptions,
1135) -> datafusion_common::Result<WriterBuilder> {
1136    let mut builder = WriterBuilder::new();
1137    if !writer_options.delimiter.is_empty() {
1138        if let Some(delimiter) = writer_options.delimiter.chars().next() {
1139            if delimiter.is_ascii() {
1140                builder = builder.with_delimiter(delimiter as u8);
1141            } else {
1142                return Err(proto_error("CSV Delimiter is not ASCII"));
1143            }
1144        } else {
1145            return Err(proto_error("Error parsing CSV Delimiter"));
1146        }
1147    }
1148    if !writer_options.quote.is_empty() {
1149        if let Some(quote) = writer_options.quote.chars().next() {
1150            if quote.is_ascii() {
1151                builder = builder.with_quote(quote as u8);
1152            } else {
1153                return Err(proto_error("CSV Quote is not ASCII"));
1154            }
1155        } else {
1156            return Err(proto_error("Error parsing CSV Quote"));
1157        }
1158    }
1159    if !writer_options.escape.is_empty() {
1160        if let Some(escape) = writer_options.escape.chars().next() {
1161            if escape.is_ascii() {
1162                builder = builder.with_escape(escape as u8);
1163            } else {
1164                return Err(proto_error("CSV Escape is not ASCII"));
1165            }
1166        } else {
1167            return Err(proto_error("Error parsing CSV Escape"));
1168        }
1169    }
1170    Ok(builder
1171        .with_header(writer_options.has_header)
1172        .with_date_format(writer_options.date_format.clone())
1173        .with_datetime_format(writer_options.datetime_format.clone())
1174        .with_timestamp_format(writer_options.timestamp_format.clone())
1175        .with_time_format(writer_options.time_format.clone())
1176        .with_null(writer_options.null_value.clone())
1177        .with_double_quote(writer_options.double_quote))
1178}