datafusion_proto_common/from_proto/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::convert::{TryFrom, TryInto};
20use std::sync::Arc;
21
22use crate::common::proto_error;
23use crate::protobuf_common as protobuf;
24use arrow::array::{ArrayRef, AsArray};
25use arrow::buffer::Buffer;
26use arrow::csv::WriterBuilder;
27use arrow::datatypes::{
28    DataType, Field, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, Schema,
29    TimeUnit, UnionFields, UnionMode, i256,
30};
31use arrow::ipc::{reader::read_record_batch, root_as_message};
32
33use datafusion_common::{
34    Column, ColumnStatistics, Constraint, Constraints, DFSchema, DFSchemaRef,
35    DataFusionError, JoinSide, ScalarValue, Statistics, TableReference,
36    arrow_datafusion_err,
37    config::{
38        CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
39        TableParquetOptions,
40    },
41    file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
42    parsers::CompressionTypeVariant,
43    plan_datafusion_err,
44    stats::Precision,
45};
46
47#[derive(Debug)]
48pub enum Error {
49    General(String),
50
51    DataFusionError(DataFusionError),
52
53    MissingRequiredField(String),
54
55    AtLeastOneValue(String),
56
57    UnknownEnumVariant { name: String, value: i32 },
58}
59
60impl std::fmt::Display for Error {
61    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
62        match self {
63            Self::General(desc) => write!(f, "General error: {desc}"),
64
65            Self::DataFusionError(desc) => {
66                write!(f, "DataFusion error: {desc:?}")
67            }
68
69            Self::MissingRequiredField(name) => {
70                write!(f, "Missing required field {name}")
71            }
72            Self::AtLeastOneValue(name) => {
73                write!(f, "Must have at least one {name}, found 0")
74            }
75            Self::UnknownEnumVariant { name, value } => {
76                write!(f, "Unknown i32 value for {name} enum: {value}")
77            }
78        }
79    }
80}
81
82impl std::error::Error for Error {}
83
84impl From<DataFusionError> for Error {
85    fn from(e: DataFusionError) -> Self {
86        Error::DataFusionError(e)
87    }
88}
89
90impl Error {
91    pub fn required(field: impl Into<String>) -> Error {
92        Error::MissingRequiredField(field.into())
93    }
94
95    pub fn unknown(name: impl Into<String>, value: i32) -> Error {
96        Error::UnknownEnumVariant {
97            name: name.into(),
98            value,
99        }
100    }
101}
102
103impl From<Error> for DataFusionError {
104    fn from(e: Error) -> Self {
105        plan_datafusion_err!("{}", e)
106    }
107}
108
109/// An extension trait that adds the methods `optional` and `required` to any
110/// Option containing a type implementing `TryInto<U, Error = Error>`
111pub trait FromOptionalField<T> {
112    /// Converts an optional protobuf field to an option of a different type
113    ///
114    /// Returns None if the option is None, otherwise calls [`TryInto::try_into`]
115    /// on the contained data, returning any error encountered
116    fn optional(self) -> datafusion_common::Result<Option<T>, Error>;
117
118    /// Converts an optional protobuf field to a different type, returning an error if None
119    ///
120    /// Returns `Error::MissingRequiredField` if None, otherwise calls [`TryInto::try_into`]
121    /// on the contained data, returning any error encountered
122    fn required(self, field: impl Into<String>) -> datafusion_common::Result<T, Error>;
123}
124
125impl<T, U> FromOptionalField<U> for Option<T>
126where
127    T: TryInto<U, Error = Error>,
128{
129    fn optional(self) -> datafusion_common::Result<Option<U>, Error> {
130        self.map(|t| t.try_into()).transpose()
131    }
132
133    fn required(self, field: impl Into<String>) -> datafusion_common::Result<U, Error> {
134        match self {
135            None => Err(Error::required(field)),
136            Some(t) => t.try_into(),
137        }
138    }
139}
140
141impl From<protobuf::ColumnRelation> for TableReference {
142    fn from(rel: protobuf::ColumnRelation) -> Self {
143        Self::parse_str_normalized(rel.relation.as_str(), true)
144    }
145}
146
147impl From<protobuf::Column> for Column {
148    fn from(c: protobuf::Column) -> Self {
149        let protobuf::Column { relation, name } = c;
150
151        Self::new(relation, name)
152    }
153}
154
155impl From<&protobuf::Column> for Column {
156    fn from(c: &protobuf::Column) -> Self {
157        c.clone().into()
158    }
159}
160
161impl TryFrom<&protobuf::DfSchema> for DFSchema {
162    type Error = Error;
163
164    fn try_from(
165        df_schema: &protobuf::DfSchema,
166    ) -> datafusion_common::Result<Self, Self::Error> {
167        let df_fields = df_schema.columns.clone();
168        let qualifiers_and_fields: Vec<(Option<TableReference>, Arc<Field>)> = df_fields
169            .iter()
170            .map(|df_field| {
171                let field: Field = df_field.field.as_ref().required("field")?;
172                Ok((
173                    df_field.qualifier.as_ref().map(|q| q.clone().into()),
174                    Arc::new(field),
175                ))
176            })
177            .collect::<datafusion_common::Result<Vec<_>, Error>>()?;
178
179        Ok(DFSchema::new_with_metadata(
180            qualifiers_and_fields,
181            df_schema.metadata.clone(),
182        )?)
183    }
184}
185
186impl TryFrom<protobuf::DfSchema> for DFSchemaRef {
187    type Error = Error;
188
189    fn try_from(
190        df_schema: protobuf::DfSchema,
191    ) -> datafusion_common::Result<Self, Self::Error> {
192        let dfschema: DFSchema = (&df_schema).try_into()?;
193        Ok(Arc::new(dfschema))
194    }
195}
196
197impl TryFrom<&protobuf::ArrowType> for DataType {
198    type Error = Error;
199
200    fn try_from(
201        arrow_type: &protobuf::ArrowType,
202    ) -> datafusion_common::Result<Self, Self::Error> {
203        arrow_type
204            .arrow_type_enum
205            .as_ref()
206            .required("arrow_type_enum")
207    }
208}
209
210impl TryFrom<&protobuf::arrow_type::ArrowTypeEnum> for DataType {
211    type Error = Error;
212    fn try_from(
213        arrow_type_enum: &protobuf::arrow_type::ArrowTypeEnum,
214    ) -> datafusion_common::Result<Self, Self::Error> {
215        use protobuf::arrow_type;
216        Ok(match arrow_type_enum {
217            arrow_type::ArrowTypeEnum::None(_) => DataType::Null,
218            arrow_type::ArrowTypeEnum::Bool(_) => DataType::Boolean,
219            arrow_type::ArrowTypeEnum::Uint8(_) => DataType::UInt8,
220            arrow_type::ArrowTypeEnum::Int8(_) => DataType::Int8,
221            arrow_type::ArrowTypeEnum::Uint16(_) => DataType::UInt16,
222            arrow_type::ArrowTypeEnum::Int16(_) => DataType::Int16,
223            arrow_type::ArrowTypeEnum::Uint32(_) => DataType::UInt32,
224            arrow_type::ArrowTypeEnum::Int32(_) => DataType::Int32,
225            arrow_type::ArrowTypeEnum::Uint64(_) => DataType::UInt64,
226            arrow_type::ArrowTypeEnum::Int64(_) => DataType::Int64,
227            arrow_type::ArrowTypeEnum::Float16(_) => DataType::Float16,
228            arrow_type::ArrowTypeEnum::Float32(_) => DataType::Float32,
229            arrow_type::ArrowTypeEnum::Float64(_) => DataType::Float64,
230            arrow_type::ArrowTypeEnum::Utf8(_) => DataType::Utf8,
231            arrow_type::ArrowTypeEnum::Utf8View(_) => DataType::Utf8View,
232            arrow_type::ArrowTypeEnum::LargeUtf8(_) => DataType::LargeUtf8,
233            arrow_type::ArrowTypeEnum::Binary(_) => DataType::Binary,
234            arrow_type::ArrowTypeEnum::BinaryView(_) => DataType::BinaryView,
235            arrow_type::ArrowTypeEnum::FixedSizeBinary(size) => {
236                DataType::FixedSizeBinary(*size)
237            }
238            arrow_type::ArrowTypeEnum::LargeBinary(_) => DataType::LargeBinary,
239            arrow_type::ArrowTypeEnum::Date32(_) => DataType::Date32,
240            arrow_type::ArrowTypeEnum::Date64(_) => DataType::Date64,
241            arrow_type::ArrowTypeEnum::Duration(time_unit) => {
242                DataType::Duration(parse_i32_to_time_unit(time_unit)?)
243            }
244            arrow_type::ArrowTypeEnum::Timestamp(protobuf::Timestamp {
245                time_unit,
246                timezone,
247            }) => DataType::Timestamp(
248                parse_i32_to_time_unit(time_unit)?,
249                match timezone.len() {
250                    0 => None,
251                    _ => Some(timezone.as_str().into()),
252                },
253            ),
254            arrow_type::ArrowTypeEnum::Time32(time_unit) => {
255                DataType::Time32(parse_i32_to_time_unit(time_unit)?)
256            }
257            arrow_type::ArrowTypeEnum::Time64(time_unit) => {
258                DataType::Time64(parse_i32_to_time_unit(time_unit)?)
259            }
260            arrow_type::ArrowTypeEnum::Interval(interval_unit) => {
261                DataType::Interval(parse_i32_to_interval_unit(interval_unit)?)
262            }
263            arrow_type::ArrowTypeEnum::Decimal32(protobuf::Decimal32Type {
264                precision,
265                scale,
266            }) => DataType::Decimal32(*precision as u8, *scale as i8),
267            arrow_type::ArrowTypeEnum::Decimal64(protobuf::Decimal64Type {
268                precision,
269                scale,
270            }) => DataType::Decimal64(*precision as u8, *scale as i8),
271            arrow_type::ArrowTypeEnum::Decimal128(protobuf::Decimal128Type {
272                precision,
273                scale,
274            }) => DataType::Decimal128(*precision as u8, *scale as i8),
275            arrow_type::ArrowTypeEnum::Decimal256(protobuf::Decimal256Type {
276                precision,
277                scale,
278            }) => DataType::Decimal256(*precision as u8, *scale as i8),
279            arrow_type::ArrowTypeEnum::List(list) => {
280                let list_type =
281                    list.as_ref().field_type.as_deref().required("field_type")?;
282                DataType::List(Arc::new(list_type))
283            }
284            arrow_type::ArrowTypeEnum::LargeList(list) => {
285                let list_type =
286                    list.as_ref().field_type.as_deref().required("field_type")?;
287                DataType::LargeList(Arc::new(list_type))
288            }
289            arrow_type::ArrowTypeEnum::FixedSizeList(list) => {
290                let list_type =
291                    list.as_ref().field_type.as_deref().required("field_type")?;
292                let list_size = list.list_size;
293                DataType::FixedSizeList(Arc::new(list_type), list_size)
294            }
295            arrow_type::ArrowTypeEnum::Struct(strct) => DataType::Struct(
296                parse_proto_fields_to_fields(&strct.sub_field_types)?.into(),
297            ),
298            arrow_type::ArrowTypeEnum::Union(union) => {
299                let union_mode = protobuf::UnionMode::try_from(union.union_mode)
300                    .map_err(|_| Error::unknown("UnionMode", union.union_mode))?;
301                let union_mode = match union_mode {
302                    protobuf::UnionMode::Dense => UnionMode::Dense,
303                    protobuf::UnionMode::Sparse => UnionMode::Sparse,
304                };
305                let union_fields = parse_proto_fields_to_fields(&union.union_types)?;
306
307                // Default to index based type ids if not provided
308                let type_ids: Vec<_> = match union.type_ids.is_empty() {
309                    true => (0..union_fields.len() as i8).collect(),
310                    false => union.type_ids.iter().map(|i| *i as i8).collect(),
311                };
312
313                DataType::Union(UnionFields::new(type_ids, union_fields), union_mode)
314            }
315            arrow_type::ArrowTypeEnum::Dictionary(dict) => {
316                let key_datatype = dict.as_ref().key.as_deref().required("key")?;
317                let value_datatype = dict.as_ref().value.as_deref().required("value")?;
318                DataType::Dictionary(Box::new(key_datatype), Box::new(value_datatype))
319            }
320            arrow_type::ArrowTypeEnum::Map(map) => {
321                let field: Field =
322                    map.as_ref().field_type.as_deref().required("field_type")?;
323                let keys_sorted = map.keys_sorted;
324                DataType::Map(Arc::new(field), keys_sorted)
325            }
326        })
327    }
328}
329
330impl TryFrom<&protobuf::Field> for Field {
331    type Error = Error;
332    fn try_from(field: &protobuf::Field) -> Result<Self, Self::Error> {
333        let datatype = field.arrow_type.as_deref().required("arrow_type")?;
334        let field = Self::new(field.name.as_str(), datatype, field.nullable)
335            .with_metadata(field.metadata.clone());
336        Ok(field)
337    }
338}
339
340impl TryFrom<&protobuf::Schema> for Schema {
341    type Error = Error;
342
343    fn try_from(
344        schema: &protobuf::Schema,
345    ) -> datafusion_common::Result<Self, Self::Error> {
346        let fields = schema
347            .columns
348            .iter()
349            .map(Field::try_from)
350            .collect::<datafusion_common::Result<Vec<_>, _>>()?;
351        Ok(Self::new_with_metadata(fields, schema.metadata.clone()))
352    }
353}
354
355impl TryFrom<&protobuf::ScalarValue> for ScalarValue {
356    type Error = Error;
357
358    fn try_from(
359        scalar: &protobuf::ScalarValue,
360    ) -> datafusion_common::Result<Self, Self::Error> {
361        use protobuf::scalar_value::Value;
362
363        let value = scalar
364            .value
365            .as_ref()
366            .ok_or_else(|| Error::required("value"))?;
367
368        Ok(match value {
369            Value::BoolValue(v) => Self::Boolean(Some(*v)),
370            Value::Utf8Value(v) => Self::Utf8(Some(v.to_owned())),
371            Value::Utf8ViewValue(v) => Self::Utf8View(Some(v.to_owned())),
372            Value::LargeUtf8Value(v) => Self::LargeUtf8(Some(v.to_owned())),
373            Value::Int8Value(v) => Self::Int8(Some(*v as i8)),
374            Value::Int16Value(v) => Self::Int16(Some(*v as i16)),
375            Value::Int32Value(v) => Self::Int32(Some(*v)),
376            Value::Int64Value(v) => Self::Int64(Some(*v)),
377            Value::Uint8Value(v) => Self::UInt8(Some(*v as u8)),
378            Value::Uint16Value(v) => Self::UInt16(Some(*v as u16)),
379            Value::Uint32Value(v) => Self::UInt32(Some(*v)),
380            Value::Uint64Value(v) => Self::UInt64(Some(*v)),
381            Value::Float32Value(v) => Self::Float32(Some(*v)),
382            Value::Float64Value(v) => Self::Float64(Some(*v)),
383            Value::Date32Value(v) => Self::Date32(Some(*v)),
384            // ScalarValue::List is serialized using arrow IPC format
385            Value::ListValue(v)
386            | Value::FixedSizeListValue(v)
387            | Value::LargeListValue(v)
388            | Value::StructValue(v)
389            | Value::MapValue(v) => {
390                let protobuf::ScalarNestedValue {
391                    ipc_message,
392                    arrow_data,
393                    dictionaries,
394                    schema,
395                } = &v;
396
397                let schema: Schema = if let Some(schema_ref) = schema {
398                    schema_ref.try_into()?
399                } else {
400                    return Err(Error::General(
401                        "Invalid schema while deserializing ScalarValue::List"
402                            .to_string(),
403                    ));
404                };
405
406                let message = root_as_message(ipc_message.as_slice()).map_err(|e| {
407                    Error::General(format!(
408                        "Error IPC message while deserializing ScalarValue::List: {e}"
409                    ))
410                })?;
411                let buffer = Buffer::from(arrow_data.as_slice());
412
413                let ipc_batch = message.header_as_record_batch().ok_or_else(|| {
414                    Error::General(
415                        "Unexpected message type deserializing ScalarValue::List"
416                            .to_string(),
417                    )
418                })?;
419
420                let dict_by_id: HashMap<i64,ArrayRef> = dictionaries.iter().map(|protobuf::scalar_nested_value::Dictionary { ipc_message, arrow_data }| {
421                    let message = root_as_message(ipc_message.as_slice()).map_err(|e| {
422                        Error::General(format!(
423                            "Error IPC message while deserializing ScalarValue::List dictionary message: {e}"
424                        ))
425                    })?;
426                    let buffer = Buffer::from(arrow_data.as_slice());
427
428                    let dict_batch = message.header_as_dictionary_batch().ok_or_else(|| {
429                        Error::General(
430                            "Unexpected message type deserializing ScalarValue::List dictionary message"
431                                .to_string(),
432                        )
433                    })?;
434
435                    let id = dict_batch.id();
436
437                    let record_batch = read_record_batch(
438                        &buffer,
439                        dict_batch.data().unwrap(),
440                        Arc::new(schema.clone()),
441                        &Default::default(),
442                        None,
443                        &message.version(),
444                    )?;
445
446                    let values: ArrayRef = Arc::clone(record_batch.column(0));
447
448                    Ok((id, values))
449                }).collect::<datafusion_common::Result<HashMap<_, _>>>()?;
450
451                let record_batch = read_record_batch(
452                    &buffer,
453                    ipc_batch,
454                    Arc::new(schema),
455                    &dict_by_id,
456                    None,
457                    &message.version(),
458                )
459                .map_err(|e| arrow_datafusion_err!(e))
460                .map_err(|e| e.context("Decoding ScalarValue::List Value"))?;
461                let arr = record_batch.column(0);
462                match value {
463                    Value::ListValue(_) => {
464                        Self::List(arr.as_list::<i32>().to_owned().into())
465                    }
466                    Value::LargeListValue(_) => {
467                        Self::LargeList(arr.as_list::<i64>().to_owned().into())
468                    }
469                    Value::FixedSizeListValue(_) => {
470                        Self::FixedSizeList(arr.as_fixed_size_list().to_owned().into())
471                    }
472                    Value::StructValue(_) => {
473                        Self::Struct(arr.as_struct().to_owned().into())
474                    }
475                    Value::MapValue(_) => Self::Map(arr.as_map().to_owned().into()),
476                    _ => unreachable!(),
477                }
478            }
479            Value::NullValue(v) => {
480                let null_type: DataType = v.try_into()?;
481                null_type.try_into().map_err(Error::DataFusionError)?
482            }
483            Value::Decimal32Value(val) => {
484                let array = vec_to_array(val.value.clone());
485                Self::Decimal32(Some(i32::from_be_bytes(array)), val.p as u8, val.s as i8)
486            }
487            Value::Decimal64Value(val) => {
488                let array = vec_to_array(val.value.clone());
489                Self::Decimal64(Some(i64::from_be_bytes(array)), val.p as u8, val.s as i8)
490            }
491            Value::Decimal128Value(val) => {
492                let array = vec_to_array(val.value.clone());
493                Self::Decimal128(
494                    Some(i128::from_be_bytes(array)),
495                    val.p as u8,
496                    val.s as i8,
497                )
498            }
499            Value::Decimal256Value(val) => {
500                let array = vec_to_array(val.value.clone());
501                Self::Decimal256(
502                    Some(i256::from_be_bytes(array)),
503                    val.p as u8,
504                    val.s as i8,
505                )
506            }
507            Value::Date64Value(v) => Self::Date64(Some(*v)),
508            Value::Time32Value(v) => {
509                let time_value =
510                    v.value.as_ref().ok_or_else(|| Error::required("value"))?;
511                match time_value {
512                    protobuf::scalar_time32_value::Value::Time32SecondValue(t) => {
513                        Self::Time32Second(Some(*t))
514                    }
515                    protobuf::scalar_time32_value::Value::Time32MillisecondValue(t) => {
516                        Self::Time32Millisecond(Some(*t))
517                    }
518                }
519            }
520            Value::Time64Value(v) => {
521                let time_value =
522                    v.value.as_ref().ok_or_else(|| Error::required("value"))?;
523                match time_value {
524                    protobuf::scalar_time64_value::Value::Time64MicrosecondValue(t) => {
525                        Self::Time64Microsecond(Some(*t))
526                    }
527                    protobuf::scalar_time64_value::Value::Time64NanosecondValue(t) => {
528                        Self::Time64Nanosecond(Some(*t))
529                    }
530                }
531            }
532            Value::IntervalYearmonthValue(v) => Self::IntervalYearMonth(Some(*v)),
533            Value::DurationSecondValue(v) => Self::DurationSecond(Some(*v)),
534            Value::DurationMillisecondValue(v) => Self::DurationMillisecond(Some(*v)),
535            Value::DurationMicrosecondValue(v) => Self::DurationMicrosecond(Some(*v)),
536            Value::DurationNanosecondValue(v) => Self::DurationNanosecond(Some(*v)),
537            Value::TimestampValue(v) => {
538                let timezone = if v.timezone.is_empty() {
539                    None
540                } else {
541                    Some(v.timezone.as_str().into())
542                };
543
544                let ts_value =
545                    v.value.as_ref().ok_or_else(|| Error::required("value"))?;
546
547                match ts_value {
548                    protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(t) => {
549                        Self::TimestampMicrosecond(Some(*t), timezone)
550                    }
551                    protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(t) => {
552                        Self::TimestampNanosecond(Some(*t), timezone)
553                    }
554                    protobuf::scalar_timestamp_value::Value::TimeSecondValue(t) => {
555                        Self::TimestampSecond(Some(*t), timezone)
556                    }
557                    protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(t) => {
558                        Self::TimestampMillisecond(Some(*t), timezone)
559                    }
560                }
561            }
562            Value::DictionaryValue(v) => {
563                let index_type: DataType = v
564                    .index_type
565                    .as_ref()
566                    .ok_or_else(|| Error::required("index_type"))?
567                    .try_into()?;
568
569                let value: Self = v
570                    .value
571                    .as_ref()
572                    .ok_or_else(|| Error::required("value"))?
573                    .as_ref()
574                    .try_into()?;
575
576                Self::Dictionary(Box::new(index_type), Box::new(value))
577            }
578            Value::BinaryValue(v) => Self::Binary(Some(v.clone())),
579            Value::BinaryViewValue(v) => Self::BinaryView(Some(v.clone())),
580            Value::LargeBinaryValue(v) => Self::LargeBinary(Some(v.clone())),
581            Value::IntervalDaytimeValue(v) => Self::IntervalDayTime(Some(
582                IntervalDayTimeType::make_value(v.days, v.milliseconds),
583            )),
584            Value::IntervalMonthDayNano(v) => Self::IntervalMonthDayNano(Some(
585                IntervalMonthDayNanoType::make_value(v.months, v.days, v.nanos),
586            )),
587            Value::UnionValue(val) => {
588                let mode = match val.mode {
589                    0 => UnionMode::Sparse,
590                    1 => UnionMode::Dense,
591                    id => Err(Error::unknown("UnionMode", id))?,
592                };
593                let ids = val
594                    .fields
595                    .iter()
596                    .map(|f| f.field_id as i8)
597                    .collect::<Vec<_>>();
598                let fields = val
599                    .fields
600                    .iter()
601                    .map(|f| f.field.clone())
602                    .collect::<Option<Vec<_>>>();
603                let fields = fields.ok_or_else(|| Error::required("UnionField"))?;
604                let fields = parse_proto_fields_to_fields(&fields)?;
605                let fields = UnionFields::new(ids, fields);
606                let v_id = val.value_id as i8;
607                let val = match &val.value {
608                    None => None,
609                    Some(val) => {
610                        let val: ScalarValue = val
611                            .as_ref()
612                            .try_into()
613                            .map_err(|_| Error::General("Invalid Scalar".to_string()))?;
614                        Some((v_id, Box::new(val)))
615                    }
616                };
617                Self::Union(val, fields, mode)
618            }
619            Value::FixedSizeBinaryValue(v) => {
620                Self::FixedSizeBinary(v.length, Some(v.clone().values))
621            }
622        })
623    }
624}
625
626impl From<protobuf::TimeUnit> for TimeUnit {
627    fn from(time_unit: protobuf::TimeUnit) -> Self {
628        match time_unit {
629            protobuf::TimeUnit::Second => TimeUnit::Second,
630            protobuf::TimeUnit::Millisecond => TimeUnit::Millisecond,
631            protobuf::TimeUnit::Microsecond => TimeUnit::Microsecond,
632            protobuf::TimeUnit::Nanosecond => TimeUnit::Nanosecond,
633        }
634    }
635}
636
637impl From<protobuf::IntervalUnit> for IntervalUnit {
638    fn from(interval_unit: protobuf::IntervalUnit) -> Self {
639        match interval_unit {
640            protobuf::IntervalUnit::YearMonth => IntervalUnit::YearMonth,
641            protobuf::IntervalUnit::DayTime => IntervalUnit::DayTime,
642            protobuf::IntervalUnit::MonthDayNano => IntervalUnit::MonthDayNano,
643        }
644    }
645}
646
647impl From<protobuf::Constraints> for Constraints {
648    fn from(constraints: protobuf::Constraints) -> Self {
649        Constraints::new_unverified(
650            constraints
651                .constraints
652                .into_iter()
653                .map(|item| item.into())
654                .collect(),
655        )
656    }
657}
658
659impl From<protobuf::Constraint> for Constraint {
660    fn from(value: protobuf::Constraint) -> Self {
661        match value.constraint_mode.unwrap() {
662            protobuf::constraint::ConstraintMode::PrimaryKey(elem) => {
663                Constraint::PrimaryKey(
664                    elem.indices.into_iter().map(|item| item as usize).collect(),
665                )
666            }
667            protobuf::constraint::ConstraintMode::Unique(elem) => Constraint::Unique(
668                elem.indices.into_iter().map(|item| item as usize).collect(),
669            ),
670        }
671    }
672}
673
674impl From<&protobuf::ColumnStats> for ColumnStatistics {
675    fn from(cs: &protobuf::ColumnStats) -> ColumnStatistics {
676        ColumnStatistics {
677            null_count: if let Some(nc) = &cs.null_count {
678                nc.clone().into()
679            } else {
680                Precision::Absent
681            },
682            max_value: if let Some(max) = &cs.max_value {
683                max.clone().into()
684            } else {
685                Precision::Absent
686            },
687            min_value: if let Some(min) = &cs.min_value {
688                min.clone().into()
689            } else {
690                Precision::Absent
691            },
692            sum_value: if let Some(sum) = &cs.sum_value {
693                sum.clone().into()
694            } else {
695                Precision::Absent
696            },
697            distinct_count: if let Some(dc) = &cs.distinct_count {
698                dc.clone().into()
699            } else {
700                Precision::Absent
701            },
702            byte_size: if let Some(sbs) = &cs.byte_size {
703                sbs.clone().into()
704            } else {
705                Precision::Absent
706            },
707        }
708    }
709}
710
711impl From<protobuf::Precision> for Precision<usize> {
712    fn from(s: protobuf::Precision) -> Self {
713        let Ok(precision_type) = s.precision_info.try_into() else {
714            return Precision::Absent;
715        };
716        match precision_type {
717            protobuf::PrecisionInfo::Exact => {
718                if let Some(val) = s.val {
719                    if let Ok(ScalarValue::UInt64(Some(val))) =
720                        ScalarValue::try_from(&val)
721                    {
722                        Precision::Exact(val as usize)
723                    } else {
724                        Precision::Absent
725                    }
726                } else {
727                    Precision::Absent
728                }
729            }
730            protobuf::PrecisionInfo::Inexact => {
731                if let Some(val) = s.val {
732                    if let Ok(ScalarValue::UInt64(Some(val))) =
733                        ScalarValue::try_from(&val)
734                    {
735                        Precision::Inexact(val as usize)
736                    } else {
737                        Precision::Absent
738                    }
739                } else {
740                    Precision::Absent
741                }
742            }
743            protobuf::PrecisionInfo::Absent => Precision::Absent,
744        }
745    }
746}
747
748impl From<protobuf::Precision> for Precision<ScalarValue> {
749    fn from(s: protobuf::Precision) -> Self {
750        let Ok(precision_type) = s.precision_info.try_into() else {
751            return Precision::Absent;
752        };
753        match precision_type {
754            protobuf::PrecisionInfo::Exact => {
755                if let Some(val) = s.val {
756                    if let Ok(val) = ScalarValue::try_from(&val) {
757                        Precision::Exact(val)
758                    } else {
759                        Precision::Absent
760                    }
761                } else {
762                    Precision::Absent
763                }
764            }
765            protobuf::PrecisionInfo::Inexact => {
766                if let Some(val) = s.val {
767                    if let Ok(val) = ScalarValue::try_from(&val) {
768                        Precision::Inexact(val)
769                    } else {
770                        Precision::Absent
771                    }
772                } else {
773                    Precision::Absent
774                }
775            }
776            protobuf::PrecisionInfo::Absent => Precision::Absent,
777        }
778    }
779}
780
781impl From<protobuf::JoinSide> for JoinSide {
782    fn from(t: protobuf::JoinSide) -> Self {
783        match t {
784            protobuf::JoinSide::LeftSide => JoinSide::Left,
785            protobuf::JoinSide::RightSide => JoinSide::Right,
786            protobuf::JoinSide::None => JoinSide::None,
787        }
788    }
789}
790
791impl From<&protobuf::Constraint> for Constraint {
792    fn from(value: &protobuf::Constraint) -> Self {
793        match &value.constraint_mode {
794            Some(protobuf::constraint::ConstraintMode::PrimaryKey(elem)) => {
795                Constraint::PrimaryKey(
796                    elem.indices.iter().map(|&item| item as usize).collect(),
797                )
798            }
799            Some(protobuf::constraint::ConstraintMode::Unique(elem)) => {
800                Constraint::Unique(
801                    elem.indices.iter().map(|&item| item as usize).collect(),
802                )
803            }
804            None => panic!("constraint_mode not set"),
805        }
806    }
807}
808
809impl TryFrom<&protobuf::Constraints> for Constraints {
810    type Error = DataFusionError;
811
812    fn try_from(
813        constraints: &protobuf::Constraints,
814    ) -> datafusion_common::Result<Self, Self::Error> {
815        Ok(Constraints::new_unverified(
816            constraints
817                .constraints
818                .iter()
819                .map(|item| item.into())
820                .collect(),
821        ))
822    }
823}
824
825impl TryFrom<&protobuf::Statistics> for Statistics {
826    type Error = DataFusionError;
827
828    fn try_from(
829        s: &protobuf::Statistics,
830    ) -> datafusion_common::Result<Self, Self::Error> {
831        // Keep it sync with Statistics::to_proto
832        Ok(Statistics {
833            num_rows: if let Some(nr) = &s.num_rows {
834                nr.clone().into()
835            } else {
836                Precision::Absent
837            },
838            total_byte_size: if let Some(tbs) = &s.total_byte_size {
839                tbs.clone().into()
840            } else {
841                Precision::Absent
842            },
843            // No column statistic (None) is encoded with empty array
844            column_statistics: s.column_stats.iter().map(|s| s.into()).collect(),
845        })
846    }
847}
848
849impl From<protobuf::CompressionTypeVariant> for CompressionTypeVariant {
850    fn from(value: protobuf::CompressionTypeVariant) -> Self {
851        match value {
852            protobuf::CompressionTypeVariant::Gzip => Self::GZIP,
853            protobuf::CompressionTypeVariant::Bzip2 => Self::BZIP2,
854            protobuf::CompressionTypeVariant::Xz => Self::XZ,
855            protobuf::CompressionTypeVariant::Zstd => Self::ZSTD,
856            protobuf::CompressionTypeVariant::Uncompressed => Self::UNCOMPRESSED,
857        }
858    }
859}
860
861impl From<CompressionTypeVariant> for protobuf::CompressionTypeVariant {
862    fn from(value: CompressionTypeVariant) -> Self {
863        match value {
864            CompressionTypeVariant::GZIP => Self::Gzip,
865            CompressionTypeVariant::BZIP2 => Self::Bzip2,
866            CompressionTypeVariant::XZ => Self::Xz,
867            CompressionTypeVariant::ZSTD => Self::Zstd,
868            CompressionTypeVariant::UNCOMPRESSED => Self::Uncompressed,
869        }
870    }
871}
872
873impl TryFrom<&protobuf::CsvWriterOptions> for CsvWriterOptions {
874    type Error = DataFusionError;
875
876    fn try_from(
877        opts: &protobuf::CsvWriterOptions,
878    ) -> datafusion_common::Result<Self, Self::Error> {
879        let write_options = csv_writer_options_from_proto(opts)?;
880        let compression: CompressionTypeVariant = opts.compression().into();
881        Ok(CsvWriterOptions::new(write_options, compression))
882    }
883}
884
885impl TryFrom<&protobuf::JsonWriterOptions> for JsonWriterOptions {
886    type Error = DataFusionError;
887
888    fn try_from(
889        opts: &protobuf::JsonWriterOptions,
890    ) -> datafusion_common::Result<Self, Self::Error> {
891        let compression: CompressionTypeVariant = opts.compression().into();
892        Ok(JsonWriterOptions::new(compression))
893    }
894}
895
896impl TryFrom<&protobuf::CsvOptions> for CsvOptions {
897    type Error = DataFusionError;
898
899    fn try_from(
900        proto_opts: &protobuf::CsvOptions,
901    ) -> datafusion_common::Result<Self, Self::Error> {
902        Ok(CsvOptions {
903            has_header: proto_opts.has_header.first().map(|h| *h != 0),
904            delimiter: proto_opts.delimiter[0],
905            quote: proto_opts.quote[0],
906            terminator: proto_opts.terminator.first().copied(),
907            escape: proto_opts.escape.first().copied(),
908            double_quote: proto_opts.double_quote.first().map(|h| *h != 0),
909            newlines_in_values: proto_opts.newlines_in_values.first().map(|h| *h != 0),
910            compression: proto_opts.compression().into(),
911            compression_level: proto_opts.compression_level,
912            schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize),
913            date_format: (!proto_opts.date_format.is_empty())
914                .then(|| proto_opts.date_format.clone()),
915            datetime_format: (!proto_opts.datetime_format.is_empty())
916                .then(|| proto_opts.datetime_format.clone()),
917            timestamp_format: (!proto_opts.timestamp_format.is_empty())
918                .then(|| proto_opts.timestamp_format.clone()),
919            timestamp_tz_format: (!proto_opts.timestamp_tz_format.is_empty())
920                .then(|| proto_opts.timestamp_tz_format.clone()),
921            time_format: (!proto_opts.time_format.is_empty())
922                .then(|| proto_opts.time_format.clone()),
923            null_value: (!proto_opts.null_value.is_empty())
924                .then(|| proto_opts.null_value.clone()),
925            null_regex: (!proto_opts.null_regex.is_empty())
926                .then(|| proto_opts.null_regex.clone()),
927            comment: proto_opts.comment.first().copied(),
928            truncated_rows: proto_opts.truncated_rows.first().map(|h| *h != 0),
929        })
930    }
931}
932
933impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
934    type Error = DataFusionError;
935
936    fn try_from(
937        value: &protobuf::ParquetOptions,
938    ) -> datafusion_common::Result<Self, Self::Error> {
939        Ok(ParquetOptions {
940            enable_page_index: value.enable_page_index,
941            pruning: value.pruning,
942            skip_metadata: value.skip_metadata,
943            metadata_size_hint: value
944                .metadata_size_hint_opt
945                .map(|opt| match opt {
946                    protobuf::parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v) => Some(v as usize),
947                })
948                .unwrap_or(None),
949            pushdown_filters: value.pushdown_filters,
950            reorder_filters: value.reorder_filters,
951            force_filter_selections: value.force_filter_selections,
952            data_pagesize_limit: value.data_pagesize_limit as usize,
953            write_batch_size: value.write_batch_size as usize,
954            writer_version: value.writer_version.parse().map_err(|e| {
955                DataFusionError::Internal(format!("Failed to parse writer_version: {e}"))
956            })?,
957            compression: value.compression_opt.clone().map(|opt| match opt {
958                protobuf::parquet_options::CompressionOpt::Compression(v) => Some(v),
959            }).unwrap_or(None),
960            dictionary_enabled: value.dictionary_enabled_opt.as_ref().map(|protobuf::parquet_options::DictionaryEnabledOpt::DictionaryEnabled(v)| *v),
961            // Continuing from where we left off in the TryFrom implementation
962            dictionary_page_size_limit: value.dictionary_page_size_limit as usize,
963            statistics_enabled: value
964                .statistics_enabled_opt.clone()
965                .map(|opt| match opt {
966                    protobuf::parquet_options::StatisticsEnabledOpt::StatisticsEnabled(v) => Some(v),
967                })
968                .unwrap_or(None),
969            max_row_group_size: value.max_row_group_size as usize,
970            created_by: value.created_by.clone(),
971            column_index_truncate_length: value
972                .column_index_truncate_length_opt.as_ref()
973                .map(|opt| match opt {
974                    protobuf::parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v) => Some(*v as usize),
975                })
976                .unwrap_or(None),
977            statistics_truncate_length: value
978                .statistics_truncate_length_opt.as_ref()
979                .map(|opt| match opt {
980                    protobuf::parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v) => Some(*v as usize),
981                })
982                .unwrap_or(None),
983            data_page_row_count_limit: value.data_page_row_count_limit as usize,
984            encoding: value
985                .encoding_opt.clone()
986                .map(|opt| match opt {
987                    protobuf::parquet_options::EncodingOpt::Encoding(v) => Some(v),
988                })
989                .unwrap_or(None),
990            bloom_filter_on_read: value.bloom_filter_on_read,
991            bloom_filter_on_write: value.bloom_filter_on_write,
992            bloom_filter_fpp: value.clone()
993                .bloom_filter_fpp_opt
994                .map(|opt| match opt {
995                    protobuf::parquet_options::BloomFilterFppOpt::BloomFilterFpp(v) => Some(v),
996                })
997                .unwrap_or(None),
998            bloom_filter_ndv: value.clone()
999                .bloom_filter_ndv_opt
1000                .map(|opt| match opt {
1001                    protobuf::parquet_options::BloomFilterNdvOpt::BloomFilterNdv(v) => Some(v),
1002                })
1003                .unwrap_or(None),
1004            allow_single_file_parallelism: value.allow_single_file_parallelism,
1005            maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as usize,
1006            maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as usize,
1007            schema_force_view_types: value.schema_force_view_types,
1008            binary_as_string: value.binary_as_string,
1009            coerce_int96: value.coerce_int96_opt.clone().map(|opt| match opt {
1010                protobuf::parquet_options::CoerceInt96Opt::CoerceInt96(v) => Some(v),
1011            }).unwrap_or(None),
1012            skip_arrow_metadata: value.skip_arrow_metadata,
1013            max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt {
1014                protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize),
1015            }).unwrap_or(None),
1016        })
1017    }
1018}
1019
1020impl TryFrom<&protobuf::ParquetColumnOptions> for ParquetColumnOptions {
1021    type Error = DataFusionError;
1022    fn try_from(
1023        value: &protobuf::ParquetColumnOptions,
1024    ) -> datafusion_common::Result<Self, Self::Error> {
1025        Ok(ParquetColumnOptions {
1026            compression: value.compression_opt.clone().map(|opt| match opt {
1027                protobuf::parquet_column_options::CompressionOpt::Compression(v) => Some(v),
1028            }).unwrap_or(None),
1029            dictionary_enabled: value.dictionary_enabled_opt.as_ref().map(|protobuf::parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(v)| *v),
1030            statistics_enabled: value
1031                .statistics_enabled_opt.clone()
1032                .map(|opt| match opt {
1033                    protobuf::parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(v) => Some(v),
1034                })
1035                .unwrap_or(None),
1036            encoding: value
1037                .encoding_opt.clone()
1038                .map(|opt| match opt {
1039                    protobuf::parquet_column_options::EncodingOpt::Encoding(v) => Some(v),
1040                })
1041                .unwrap_or(None),
1042            bloom_filter_enabled: value.bloom_filter_enabled_opt.map(|opt| match opt {
1043                protobuf::parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v) => Some(v),
1044            })
1045                .unwrap_or(None),
1046            bloom_filter_fpp: value
1047                .bloom_filter_fpp_opt
1048                .map(|opt| match opt {
1049                    protobuf::parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v) => Some(v),
1050                })
1051                .unwrap_or(None),
1052            bloom_filter_ndv: value
1053                .bloom_filter_ndv_opt
1054                .map(|opt| match opt {
1055                    protobuf::parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v) => Some(v),
1056                })
1057                .unwrap_or(None),
1058        })
1059    }
1060}
1061
1062impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions {
1063    type Error = DataFusionError;
1064    fn try_from(
1065        value: &protobuf::TableParquetOptions,
1066    ) -> datafusion_common::Result<Self, Self::Error> {
1067        let mut column_specific_options: HashMap<String, ParquetColumnOptions> =
1068            HashMap::new();
1069        for protobuf::ParquetColumnSpecificOptions {
1070            column_name,
1071            options: maybe_options,
1072        } in &value.column_specific_options
1073        {
1074            if let Some(options) = maybe_options {
1075                column_specific_options.insert(column_name.clone(), options.try_into()?);
1076            }
1077        }
1078        Ok(TableParquetOptions {
1079            global: value
1080                .global
1081                .as_ref()
1082                .map(|v| v.try_into())
1083                .unwrap()
1084                .unwrap(),
1085            column_specific_options,
1086            key_value_metadata: Default::default(),
1087            crypto: Default::default(),
1088        })
1089    }
1090}
1091
1092impl TryFrom<&protobuf::JsonOptions> for JsonOptions {
1093    type Error = DataFusionError;
1094
1095    fn try_from(
1096        proto_opts: &protobuf::JsonOptions,
1097    ) -> datafusion_common::Result<Self, Self::Error> {
1098        let compression: protobuf::CompressionTypeVariant = proto_opts.compression();
1099        Ok(JsonOptions {
1100            compression: compression.into(),
1101            compression_level: proto_opts.compression_level,
1102            schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize),
1103        })
1104    }
1105}
1106
1107pub fn parse_i32_to_time_unit(value: &i32) -> datafusion_common::Result<TimeUnit, Error> {
1108    protobuf::TimeUnit::try_from(*value)
1109        .map(|t| t.into())
1110        .map_err(|_| Error::unknown("TimeUnit", *value))
1111}
1112
1113pub fn parse_i32_to_interval_unit(
1114    value: &i32,
1115) -> datafusion_common::Result<IntervalUnit, Error> {
1116    protobuf::IntervalUnit::try_from(*value)
1117        .map(|t| t.into())
1118        .map_err(|_| Error::unknown("IntervalUnit", *value))
1119}
1120
1121// panic here because no better way to convert from Vec to Array
1122fn vec_to_array<T, const N: usize>(v: Vec<T>) -> [T; N] {
1123    v.try_into().unwrap_or_else(|v: Vec<T>| {
1124        panic!("Expected a Vec of length {} but it was {}", N, v.len())
1125    })
1126}
1127
1128/// Converts a vector of `protobuf::Field`s to `Arc<arrow::Field>`s.
1129pub fn parse_proto_fields_to_fields<'a, I>(
1130    fields: I,
1131) -> std::result::Result<Vec<Field>, Error>
1132where
1133    I: IntoIterator<Item = &'a protobuf::Field>,
1134{
1135    fields
1136        .into_iter()
1137        .map(Field::try_from)
1138        .collect::<datafusion_common::Result<_, _>>()
1139}
1140
1141pub(crate) fn csv_writer_options_from_proto(
1142    writer_options: &protobuf::CsvWriterOptions,
1143) -> datafusion_common::Result<WriterBuilder> {
1144    let mut builder = WriterBuilder::new();
1145    if !writer_options.delimiter.is_empty() {
1146        if let Some(delimiter) = writer_options.delimiter.chars().next() {
1147            if delimiter.is_ascii() {
1148                builder = builder.with_delimiter(delimiter as u8);
1149            } else {
1150                return Err(proto_error("CSV Delimiter is not ASCII"));
1151            }
1152        } else {
1153            return Err(proto_error("Error parsing CSV Delimiter"));
1154        }
1155    }
1156    if !writer_options.quote.is_empty() {
1157        if let Some(quote) = writer_options.quote.chars().next() {
1158            if quote.is_ascii() {
1159                builder = builder.with_quote(quote as u8);
1160            } else {
1161                return Err(proto_error("CSV Quote is not ASCII"));
1162            }
1163        } else {
1164            return Err(proto_error("Error parsing CSV Quote"));
1165        }
1166    }
1167    if !writer_options.escape.is_empty() {
1168        if let Some(escape) = writer_options.escape.chars().next() {
1169            if escape.is_ascii() {
1170                builder = builder.with_escape(escape as u8);
1171            } else {
1172                return Err(proto_error("CSV Escape is not ASCII"));
1173            }
1174        } else {
1175            return Err(proto_error("Error parsing CSV Escape"));
1176        }
1177    }
1178    Ok(builder
1179        .with_header(writer_options.has_header)
1180        .with_date_format(writer_options.date_format.clone())
1181        .with_datetime_format(writer_options.datetime_format.clone())
1182        .with_timestamp_format(writer_options.timestamp_format.clone())
1183        .with_time_format(writer_options.time_format.clone())
1184        .with_null(writer_options.null_value.clone())
1185        .with_double_quote(writer_options.double_quote))
1186}