Skip to main content

datafusion_proto_common/from_proto/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::common::proto_error;
22use crate::protobuf_common as protobuf;
23use arrow::array::{ArrayRef, AsArray};
24use arrow::buffer::Buffer;
25use arrow::csv::{QuoteStyle, WriterBuilder};
26use arrow::datatypes::{
27    DataType, Field, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, Schema,
28    TimeUnit, UnionFields, UnionMode, i256,
29};
30use arrow::ipc::{
31    convert::fb_to_schema,
32    reader::{read_dictionary, read_record_batch},
33    root_as_message,
34    writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions},
35};
36
37use datafusion_common::{
38    Column, ColumnStatistics, Constraint, Constraints, DFSchema, DFSchemaRef,
39    DataFusionError, JoinSide, ScalarValue, Statistics, TableReference,
40    arrow_datafusion_err,
41    config::{
42        CsvOptions, JsonOptions, ParquetCdcOptions, ParquetColumnOptions, ParquetOptions,
43        TableParquetOptions,
44    },
45    file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
46    parsers::CompressionTypeVariant,
47    plan_datafusion_err,
48    stats::Precision,
49};
50
51#[derive(Debug)]
52pub enum Error {
53    General(String),
54
55    DataFusionError(DataFusionError),
56
57    MissingRequiredField(String),
58
59    AtLeastOneValue(String),
60
61    UnknownEnumVariant { name: String, value: i32 },
62}
63
64impl std::fmt::Display for Error {
65    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
66        match self {
67            Self::General(desc) => write!(f, "General error: {desc}"),
68
69            Self::DataFusionError(desc) => {
70                write!(f, "DataFusion error: {desc:?}")
71            }
72
73            Self::MissingRequiredField(name) => {
74                write!(f, "Missing required field {name}")
75            }
76            Self::AtLeastOneValue(name) => {
77                write!(f, "Must have at least one {name}, found 0")
78            }
79            Self::UnknownEnumVariant { name, value } => {
80                write!(f, "Unknown i32 value for {name} enum: {value}")
81            }
82        }
83    }
84}
85
86impl std::error::Error for Error {}
87
88impl From<DataFusionError> for Error {
89    fn from(e: DataFusionError) -> Self {
90        Error::DataFusionError(e)
91    }
92}
93
94impl Error {
95    pub fn required(field: impl Into<String>) -> Error {
96        Error::MissingRequiredField(field.into())
97    }
98
99    pub fn unknown(name: impl Into<String>, value: i32) -> Error {
100        Error::UnknownEnumVariant {
101            name: name.into(),
102            value,
103        }
104    }
105}
106
107impl From<Error> for DataFusionError {
108    fn from(e: Error) -> Self {
109        plan_datafusion_err!("{}", e)
110    }
111}
112
113/// An extension trait that adds the methods `optional` and `required` to any
114/// Option containing a type implementing `TryInto<U, Error = Error>`
115pub trait FromOptionalField<T> {
116    /// Converts an optional protobuf field to an option of a different type
117    ///
118    /// Returns None if the option is None, otherwise calls [`TryInto::try_into`]
119    /// on the contained data, returning any error encountered
120    fn optional(self) -> datafusion_common::Result<Option<T>, Error>;
121
122    /// Converts an optional protobuf field to a different type, returning an error if None
123    ///
124    /// Returns `Error::MissingRequiredField` if None, otherwise calls [`TryInto::try_into`]
125    /// on the contained data, returning any error encountered
126    fn required(self, field: impl Into<String>) -> datafusion_common::Result<T, Error>;
127}
128
129impl<T, U> FromOptionalField<U> for Option<T>
130where
131    T: TryInto<U, Error = Error>,
132{
133    fn optional(self) -> datafusion_common::Result<Option<U>, Error> {
134        self.map(|t| t.try_into()).transpose()
135    }
136
137    fn required(self, field: impl Into<String>) -> datafusion_common::Result<U, Error> {
138        match self {
139            None => Err(Error::required(field)),
140            Some(t) => t.try_into(),
141        }
142    }
143}
144
145impl From<protobuf::ColumnRelation> for TableReference {
146    fn from(rel: protobuf::ColumnRelation) -> Self {
147        Self::parse_str_normalized(rel.relation.as_str(), true)
148    }
149}
150
151impl From<protobuf::Column> for Column {
152    fn from(c: protobuf::Column) -> Self {
153        let protobuf::Column { relation, name } = c;
154
155        Self::new(relation, name)
156    }
157}
158
159impl From<&protobuf::Column> for Column {
160    fn from(c: &protobuf::Column) -> Self {
161        c.clone().into()
162    }
163}
164
165impl TryFrom<&protobuf::DfSchema> for DFSchema {
166    type Error = Error;
167
168    fn try_from(
169        df_schema: &protobuf::DfSchema,
170    ) -> datafusion_common::Result<Self, Self::Error> {
171        let df_fields = df_schema.columns.clone();
172        let qualifiers_and_fields: Vec<(Option<TableReference>, Arc<Field>)> = df_fields
173            .iter()
174            .map(|df_field| {
175                let field: Field = df_field.field.as_ref().required("field")?;
176                Ok((
177                    df_field.qualifier.as_ref().map(|q| q.clone().into()),
178                    Arc::new(field),
179                ))
180            })
181            .collect::<datafusion_common::Result<Vec<_>, Error>>()?;
182
183        Ok(DFSchema::new_with_metadata(
184            qualifiers_and_fields,
185            df_schema.metadata.clone(),
186        )?)
187    }
188}
189
190impl TryFrom<protobuf::DfSchema> for DFSchemaRef {
191    type Error = Error;
192
193    fn try_from(
194        df_schema: protobuf::DfSchema,
195    ) -> datafusion_common::Result<Self, Self::Error> {
196        let dfschema: DFSchema = (&df_schema).try_into()?;
197        Ok(Arc::new(dfschema))
198    }
199}
200
201impl TryFrom<&protobuf::ArrowType> for DataType {
202    type Error = Error;
203
204    fn try_from(
205        arrow_type: &protobuf::ArrowType,
206    ) -> datafusion_common::Result<Self, Self::Error> {
207        arrow_type
208            .arrow_type_enum
209            .as_ref()
210            .required("arrow_type_enum")
211    }
212}
213
214impl TryFrom<&protobuf::arrow_type::ArrowTypeEnum> for DataType {
215    type Error = Error;
216    fn try_from(
217        arrow_type_enum: &protobuf::arrow_type::ArrowTypeEnum,
218    ) -> datafusion_common::Result<Self, Self::Error> {
219        use protobuf::arrow_type;
220        Ok(match arrow_type_enum {
221            arrow_type::ArrowTypeEnum::None(_) => DataType::Null,
222            arrow_type::ArrowTypeEnum::Bool(_) => DataType::Boolean,
223            arrow_type::ArrowTypeEnum::Uint8(_) => DataType::UInt8,
224            arrow_type::ArrowTypeEnum::Int8(_) => DataType::Int8,
225            arrow_type::ArrowTypeEnum::Uint16(_) => DataType::UInt16,
226            arrow_type::ArrowTypeEnum::Int16(_) => DataType::Int16,
227            arrow_type::ArrowTypeEnum::Uint32(_) => DataType::UInt32,
228            arrow_type::ArrowTypeEnum::Int32(_) => DataType::Int32,
229            arrow_type::ArrowTypeEnum::Uint64(_) => DataType::UInt64,
230            arrow_type::ArrowTypeEnum::Int64(_) => DataType::Int64,
231            arrow_type::ArrowTypeEnum::Float16(_) => DataType::Float16,
232            arrow_type::ArrowTypeEnum::Float32(_) => DataType::Float32,
233            arrow_type::ArrowTypeEnum::Float64(_) => DataType::Float64,
234            arrow_type::ArrowTypeEnum::Utf8(_) => DataType::Utf8,
235            arrow_type::ArrowTypeEnum::Utf8View(_) => DataType::Utf8View,
236            arrow_type::ArrowTypeEnum::LargeUtf8(_) => DataType::LargeUtf8,
237            arrow_type::ArrowTypeEnum::Binary(_) => DataType::Binary,
238            arrow_type::ArrowTypeEnum::BinaryView(_) => DataType::BinaryView,
239            arrow_type::ArrowTypeEnum::FixedSizeBinary(size) => {
240                DataType::FixedSizeBinary(*size)
241            }
242            arrow_type::ArrowTypeEnum::LargeBinary(_) => DataType::LargeBinary,
243            arrow_type::ArrowTypeEnum::Date32(_) => DataType::Date32,
244            arrow_type::ArrowTypeEnum::Date64(_) => DataType::Date64,
245            arrow_type::ArrowTypeEnum::Duration(time_unit) => {
246                DataType::Duration(parse_i32_to_time_unit(time_unit)?)
247            }
248            arrow_type::ArrowTypeEnum::Timestamp(protobuf::Timestamp {
249                time_unit,
250                timezone,
251            }) => DataType::Timestamp(
252                parse_i32_to_time_unit(time_unit)?,
253                match timezone.len() {
254                    0 => None,
255                    _ => Some(timezone.as_str().into()),
256                },
257            ),
258            arrow_type::ArrowTypeEnum::Time32(time_unit) => {
259                DataType::Time32(parse_i32_to_time_unit(time_unit)?)
260            }
261            arrow_type::ArrowTypeEnum::Time64(time_unit) => {
262                DataType::Time64(parse_i32_to_time_unit(time_unit)?)
263            }
264            arrow_type::ArrowTypeEnum::Interval(interval_unit) => {
265                DataType::Interval(parse_i32_to_interval_unit(interval_unit)?)
266            }
267            arrow_type::ArrowTypeEnum::Decimal32(protobuf::Decimal32Type {
268                precision,
269                scale,
270            }) => DataType::Decimal32(*precision as u8, *scale as i8),
271            arrow_type::ArrowTypeEnum::Decimal64(protobuf::Decimal64Type {
272                precision,
273                scale,
274            }) => DataType::Decimal64(*precision as u8, *scale as i8),
275            arrow_type::ArrowTypeEnum::Decimal128(protobuf::Decimal128Type {
276                precision,
277                scale,
278            }) => DataType::Decimal128(*precision as u8, *scale as i8),
279            arrow_type::ArrowTypeEnum::Decimal256(protobuf::Decimal256Type {
280                precision,
281                scale,
282            }) => DataType::Decimal256(*precision as u8, *scale as i8),
283            arrow_type::ArrowTypeEnum::List(list) => {
284                let list_type =
285                    list.as_ref().field_type.as_deref().required("field_type")?;
286                DataType::List(Arc::new(list_type))
287            }
288            arrow_type::ArrowTypeEnum::LargeList(list) => {
289                let list_type =
290                    list.as_ref().field_type.as_deref().required("field_type")?;
291                DataType::LargeList(Arc::new(list_type))
292            }
293            arrow_type::ArrowTypeEnum::FixedSizeList(list) => {
294                let list_type =
295                    list.as_ref().field_type.as_deref().required("field_type")?;
296                let list_size = list.list_size;
297                DataType::FixedSizeList(Arc::new(list_type), list_size)
298            }
299            arrow_type::ArrowTypeEnum::ListView(list) => {
300                let list_type =
301                    list.as_ref().field_type.as_deref().required("field_type")?;
302                DataType::ListView(Arc::new(list_type))
303            }
304            arrow_type::ArrowTypeEnum::LargeListView(list) => {
305                let list_type =
306                    list.as_ref().field_type.as_deref().required("field_type")?;
307                DataType::LargeListView(Arc::new(list_type))
308            }
309            arrow_type::ArrowTypeEnum::Struct(strct) => DataType::Struct(
310                parse_proto_fields_to_fields(&strct.sub_field_types)?.into(),
311            ),
312            arrow_type::ArrowTypeEnum::Union(union) => {
313                let union_mode = protobuf::UnionMode::try_from(union.union_mode)
314                    .map_err(|_| Error::unknown("UnionMode", union.union_mode))?;
315                let union_mode = match union_mode {
316                    protobuf::UnionMode::Dense => UnionMode::Dense,
317                    protobuf::UnionMode::Sparse => UnionMode::Sparse,
318                };
319                let union_fields = parse_proto_fields_to_fields(&union.union_types)?;
320
321                // Default to index based type ids if not explicitly provided
322                let union_fields = if union.type_ids.is_empty() {
323                    UnionFields::from_fields(union_fields)
324                } else {
325                    let type_ids = union.type_ids.iter().map(|i| *i as i8);
326                    UnionFields::try_new(type_ids, union_fields).map_err(|e| {
327                        DataFusionError::from(e).context("Deserializing Union DataType")
328                    })?
329                };
330                DataType::Union(union_fields, union_mode)
331            }
332            arrow_type::ArrowTypeEnum::Dictionary(dict) => {
333                let key_datatype = dict.as_ref().key.as_deref().required("key")?;
334                let value_datatype = dict.as_ref().value.as_deref().required("value")?;
335                DataType::Dictionary(Box::new(key_datatype), Box::new(value_datatype))
336            }
337            arrow_type::ArrowTypeEnum::Map(map) => {
338                let field: Field =
339                    map.as_ref().field_type.as_deref().required("field_type")?;
340                let keys_sorted = map.keys_sorted;
341                DataType::Map(Arc::new(field), keys_sorted)
342            }
343            arrow_type::ArrowTypeEnum::RunEndEncoded(run_end_encoded) => {
344                let run_ends_field: Field = run_end_encoded
345                    .as_ref()
346                    .run_ends_field
347                    .as_deref()
348                    .required("run_ends_field")?;
349                let value_field: Field = run_end_encoded
350                    .as_ref()
351                    .values_field
352                    .as_deref()
353                    .required("values_field")?;
354                DataType::RunEndEncoded(run_ends_field.into(), value_field.into())
355            }
356        })
357    }
358}
359
360impl TryFrom<&protobuf::Field> for Field {
361    type Error = Error;
362    fn try_from(field: &protobuf::Field) -> Result<Self, Self::Error> {
363        let datatype = field.arrow_type.as_deref().required("arrow_type")?;
364        let field = Self::new(field.name.as_str(), datatype, field.nullable)
365            .with_metadata(field.metadata.clone());
366        Ok(field)
367    }
368}
369
370impl TryFrom<&protobuf::Schema> for Schema {
371    type Error = Error;
372
373    fn try_from(
374        schema: &protobuf::Schema,
375    ) -> datafusion_common::Result<Self, Self::Error> {
376        let fields = schema
377            .columns
378            .iter()
379            .map(Field::try_from)
380            .collect::<datafusion_common::Result<Vec<_>, _>>()?;
381        Ok(Self::new_with_metadata(fields, schema.metadata.clone()))
382    }
383}
384
385impl TryFrom<&protobuf::ScalarValue> for ScalarValue {
386    type Error = Error;
387
388    fn try_from(
389        scalar: &protobuf::ScalarValue,
390    ) -> datafusion_common::Result<Self, Self::Error> {
391        use protobuf::scalar_value::Value;
392
393        let value = scalar
394            .value
395            .as_ref()
396            .ok_or_else(|| Error::required("value"))?;
397
398        Ok(match value {
399            Value::BoolValue(v) => Self::Boolean(Some(*v)),
400            Value::Utf8Value(v) => Self::Utf8(Some(v.to_owned())),
401            Value::Utf8ViewValue(v) => Self::Utf8View(Some(v.to_owned())),
402            Value::LargeUtf8Value(v) => Self::LargeUtf8(Some(v.to_owned())),
403            Value::Int8Value(v) => Self::Int8(Some(*v as i8)),
404            Value::Int16Value(v) => Self::Int16(Some(*v as i16)),
405            Value::Int32Value(v) => Self::Int32(Some(*v)),
406            Value::Int64Value(v) => Self::Int64(Some(*v)),
407            Value::Uint8Value(v) => Self::UInt8(Some(*v as u8)),
408            Value::Uint16Value(v) => Self::UInt16(Some(*v as u16)),
409            Value::Uint32Value(v) => Self::UInt32(Some(*v)),
410            Value::Uint64Value(v) => Self::UInt64(Some(*v)),
411            Value::Float32Value(v) => Self::Float32(Some(*v)),
412            Value::Float64Value(v) => Self::Float64(Some(*v)),
413            Value::Date32Value(v) => Self::Date32(Some(*v)),
414            // Nested ScalarValue types are serialized using arrow IPC format
415            Value::ListValue(v)
416            | Value::FixedSizeListValue(v)
417            | Value::LargeListValue(v)
418            | Value::ListViewValue(v)
419            | Value::LargeListViewValue(v)
420            | Value::StructValue(v)
421            | Value::MapValue(v) => {
422                let protobuf::ScalarNestedValue {
423                    ipc_message,
424                    arrow_data,
425                    dictionaries,
426                    schema,
427                } = &v;
428
429                let schema: Schema = if let Some(schema_ref) = schema {
430                    schema_ref.try_into()?
431                } else {
432                    return Err(Error::General(
433                        "Invalid schema while deserializing nested ScalarValue"
434                            .to_string(),
435                    ));
436                };
437
438                // IPC dictionary batch IDs are assigned when encoding the schema, but our protobuf
439                // `Schema` doesn't preserve those IDs. Reconstruct them deterministically by
440                // round-tripping the schema through IPC.
441                let schema: Schema = {
442                    let ipc_gen = IpcDataGenerator {};
443                    let write_options = IpcWriteOptions::default();
444                    let mut dict_tracker = DictionaryTracker::new(false);
445                    let encoded_schema = ipc_gen.schema_to_bytes_with_dictionary_tracker(
446                        &schema,
447                        &mut dict_tracker,
448                        &write_options,
449                    );
450                    let message =
451                        root_as_message(encoded_schema.ipc_message.as_slice()).map_err(
452                            |e| {
453                                Error::General(format!(
454                                    "Error IPC schema message while deserializing nested ScalarValue: {e}"
455                                ))
456                            },
457                        )?;
458                    let ipc_schema = message.header_as_schema().ok_or_else(|| {
459                        Error::General(
460                            "Unexpected message type deserializing nested ScalarValue schema"
461                                .to_string(),
462                        )
463                    })?;
464                    fb_to_schema(ipc_schema)
465                };
466
467                let message = root_as_message(ipc_message.as_slice()).map_err(|e| {
468                    Error::General(format!(
469                        "Error IPC message while deserializing nested ScalarValue: {e}"
470                    ))
471                })?;
472                let buffer = Buffer::from(arrow_data.as_slice());
473
474                let ipc_batch = message.header_as_record_batch().ok_or_else(|| {
475                    Error::General(
476                        "Unexpected message type deserializing nested ScalarValue"
477                            .to_string(),
478                    )
479                })?;
480
481                let mut dict_by_id: HashMap<i64, ArrayRef> = HashMap::new();
482                for protobuf::scalar_nested_value::Dictionary {
483                    ipc_message,
484                    arrow_data,
485                } in dictionaries
486                {
487                    let message = root_as_message(ipc_message.as_slice()).map_err(|e| {
488                        Error::General(format!(
489                            "Error IPC message while deserializing nested ScalarValue dictionary message: {e}"
490                        ))
491                    })?;
492                    let buffer = Buffer::from(arrow_data.as_slice());
493
494                    let dict_batch = message.header_as_dictionary_batch().ok_or_else(|| {
495                        Error::General(
496                            "Unexpected message type deserializing nested ScalarValue dictionary message"
497                                .to_string(),
498                        )
499                    })?;
500                    read_dictionary(
501                        &buffer,
502                        dict_batch,
503                        &schema,
504                        &mut dict_by_id,
505                        &message.version(),
506                    )
507                    .map_err(|e| arrow_datafusion_err!(e))
508                    .map_err(|e| e.context("Decoding nested ScalarValue dictionary"))?;
509                }
510
511                let record_batch = read_record_batch(
512                    &buffer,
513                    ipc_batch,
514                    Arc::new(schema),
515                    &dict_by_id,
516                    None,
517                    &message.version(),
518                )
519                .map_err(|e| arrow_datafusion_err!(e))
520                .map_err(|e| e.context("Decoding nested ScalarValue value"))?;
521                let arr = record_batch.column(0);
522                match value {
523                    Value::ListValue(_) => {
524                        Self::List(arr.as_list::<i32>().to_owned().into())
525                    }
526                    Value::LargeListValue(_) => {
527                        Self::LargeList(arr.as_list::<i64>().to_owned().into())
528                    }
529                    Value::FixedSizeListValue(_) => {
530                        Self::FixedSizeList(arr.as_fixed_size_list().to_owned().into())
531                    }
532                    Value::ListViewValue(_) => {
533                        Self::ListView(arr.as_list_view::<i32>().to_owned().into())
534                    }
535                    Value::LargeListViewValue(_) => {
536                        Self::LargeListView(arr.as_list_view::<i64>().to_owned().into())
537                    }
538                    Value::StructValue(_) => {
539                        Self::Struct(arr.as_struct().to_owned().into())
540                    }
541                    Value::MapValue(_) => Self::Map(arr.as_map().to_owned().into()),
542                    _ => unreachable!(),
543                }
544            }
545            Value::NullValue(v) => {
546                let null_type: DataType = v.try_into()?;
547                null_type.try_into().map_err(Error::DataFusionError)?
548            }
549            Value::Decimal32Value(val) => {
550                let array = vec_to_array(val.value.clone());
551                Self::Decimal32(Some(i32::from_be_bytes(array)), val.p as u8, val.s as i8)
552            }
553            Value::Decimal64Value(val) => {
554                let array = vec_to_array(val.value.clone());
555                Self::Decimal64(Some(i64::from_be_bytes(array)), val.p as u8, val.s as i8)
556            }
557            Value::Decimal128Value(val) => {
558                let array = vec_to_array(val.value.clone());
559                Self::Decimal128(
560                    Some(i128::from_be_bytes(array)),
561                    val.p as u8,
562                    val.s as i8,
563                )
564            }
565            Value::Decimal256Value(val) => {
566                let array = vec_to_array(val.value.clone());
567                Self::Decimal256(
568                    Some(i256::from_be_bytes(array)),
569                    val.p as u8,
570                    val.s as i8,
571                )
572            }
573            Value::Date64Value(v) => Self::Date64(Some(*v)),
574            Value::Time32Value(v) => {
575                let time_value =
576                    v.value.as_ref().ok_or_else(|| Error::required("value"))?;
577                match time_value {
578                    protobuf::scalar_time32_value::Value::Time32SecondValue(t) => {
579                        Self::Time32Second(Some(*t))
580                    }
581                    protobuf::scalar_time32_value::Value::Time32MillisecondValue(t) => {
582                        Self::Time32Millisecond(Some(*t))
583                    }
584                }
585            }
586            Value::Time64Value(v) => {
587                let time_value =
588                    v.value.as_ref().ok_or_else(|| Error::required("value"))?;
589                match time_value {
590                    protobuf::scalar_time64_value::Value::Time64MicrosecondValue(t) => {
591                        Self::Time64Microsecond(Some(*t))
592                    }
593                    protobuf::scalar_time64_value::Value::Time64NanosecondValue(t) => {
594                        Self::Time64Nanosecond(Some(*t))
595                    }
596                }
597            }
598            Value::IntervalYearmonthValue(v) => Self::IntervalYearMonth(Some(*v)),
599            Value::DurationSecondValue(v) => Self::DurationSecond(Some(*v)),
600            Value::DurationMillisecondValue(v) => Self::DurationMillisecond(Some(*v)),
601            Value::DurationMicrosecondValue(v) => Self::DurationMicrosecond(Some(*v)),
602            Value::DurationNanosecondValue(v) => Self::DurationNanosecond(Some(*v)),
603            Value::TimestampValue(v) => {
604                let timezone = if v.timezone.is_empty() {
605                    None
606                } else {
607                    Some(v.timezone.as_str().into())
608                };
609
610                let ts_value =
611                    v.value.as_ref().ok_or_else(|| Error::required("value"))?;
612
613                match ts_value {
614                    protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(t) => {
615                        Self::TimestampMicrosecond(Some(*t), timezone)
616                    }
617                    protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(t) => {
618                        Self::TimestampNanosecond(Some(*t), timezone)
619                    }
620                    protobuf::scalar_timestamp_value::Value::TimeSecondValue(t) => {
621                        Self::TimestampSecond(Some(*t), timezone)
622                    }
623                    protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(t) => {
624                        Self::TimestampMillisecond(Some(*t), timezone)
625                    }
626                }
627            }
628            Value::DictionaryValue(v) => {
629                let index_type: DataType = v
630                    .index_type
631                    .as_ref()
632                    .ok_or_else(|| Error::required("index_type"))?
633                    .try_into()?;
634
635                let value: Self = v
636                    .value
637                    .as_ref()
638                    .ok_or_else(|| Error::required("value"))?
639                    .as_ref()
640                    .try_into()?;
641
642                Self::Dictionary(Box::new(index_type), Box::new(value))
643            }
644            Value::RunEndEncodedValue(v) => {
645                let run_ends_field: Field = v
646                    .run_ends_field
647                    .as_ref()
648                    .ok_or_else(|| Error::required("run_ends_field"))?
649                    .try_into()?;
650
651                let values_field: Field = v
652                    .values_field
653                    .as_ref()
654                    .ok_or_else(|| Error::required("values_field"))?
655                    .try_into()?;
656
657                let value: Self = v
658                    .value
659                    .as_ref()
660                    .ok_or_else(|| Error::required("value"))?
661                    .as_ref()
662                    .try_into()?;
663
664                Self::RunEndEncoded(
665                    run_ends_field.into(),
666                    values_field.into(),
667                    Box::new(value),
668                )
669            }
670            Value::BinaryValue(v) => Self::Binary(Some(v.clone())),
671            Value::BinaryViewValue(v) => Self::BinaryView(Some(v.clone())),
672            Value::LargeBinaryValue(v) => Self::LargeBinary(Some(v.clone())),
673            Value::IntervalDaytimeValue(v) => Self::IntervalDayTime(Some(
674                IntervalDayTimeType::make_value(v.days, v.milliseconds),
675            )),
676            Value::IntervalMonthDayNano(v) => Self::IntervalMonthDayNano(Some(
677                IntervalMonthDayNanoType::make_value(v.months, v.days, v.nanos),
678            )),
679            Value::UnionValue(val) => {
680                let mode = match val.mode {
681                    0 => UnionMode::Sparse,
682                    1 => UnionMode::Dense,
683                    id => Err(Error::unknown("UnionMode", id))?,
684                };
685                let ids = val
686                    .fields
687                    .iter()
688                    .map(|f| f.field_id as i8)
689                    .collect::<Vec<_>>();
690                let fields = val
691                    .fields
692                    .iter()
693                    .map(|f| f.field.clone())
694                    .collect::<Option<Vec<_>>>();
695                let fields = fields.ok_or_else(|| Error::required("UnionField"))?;
696                let fields = parse_proto_fields_to_fields(&fields)?;
697                let union_fields = UnionFields::try_new(ids, fields).map_err(|e| {
698                    DataFusionError::from(e).context("Deserializing Union ScalarValue")
699                })?;
700                let v_id = val.value_id as i8;
701                let val = match &val.value {
702                    None => None,
703                    Some(val) => {
704                        let val: ScalarValue = val
705                            .as_ref()
706                            .try_into()
707                            .map_err(|_| Error::General("Invalid Scalar".to_string()))?;
708                        Some((v_id, Box::new(val)))
709                    }
710                };
711                Self::Union(val, union_fields, mode)
712            }
713            Value::FixedSizeBinaryValue(v) => {
714                Self::FixedSizeBinary(v.length, Some(v.clone().values))
715            }
716        })
717    }
718}
719
720impl From<protobuf::TimeUnit> for TimeUnit {
721    fn from(time_unit: protobuf::TimeUnit) -> Self {
722        match time_unit {
723            protobuf::TimeUnit::Second => TimeUnit::Second,
724            protobuf::TimeUnit::Millisecond => TimeUnit::Millisecond,
725            protobuf::TimeUnit::Microsecond => TimeUnit::Microsecond,
726            protobuf::TimeUnit::Nanosecond => TimeUnit::Nanosecond,
727        }
728    }
729}
730
731impl From<protobuf::IntervalUnit> for IntervalUnit {
732    fn from(interval_unit: protobuf::IntervalUnit) -> Self {
733        match interval_unit {
734            protobuf::IntervalUnit::YearMonth => IntervalUnit::YearMonth,
735            protobuf::IntervalUnit::DayTime => IntervalUnit::DayTime,
736            protobuf::IntervalUnit::MonthDayNano => IntervalUnit::MonthDayNano,
737        }
738    }
739}
740
741impl From<protobuf::Constraints> for Constraints {
742    fn from(constraints: protobuf::Constraints) -> Self {
743        Constraints::new_unverified(
744            constraints
745                .constraints
746                .into_iter()
747                .map(|item| item.into())
748                .collect(),
749        )
750    }
751}
752
753impl From<protobuf::Constraint> for Constraint {
754    fn from(value: protobuf::Constraint) -> Self {
755        match value.constraint_mode.unwrap() {
756            protobuf::constraint::ConstraintMode::PrimaryKey(elem) => {
757                Constraint::PrimaryKey(
758                    elem.indices.into_iter().map(|item| item as usize).collect(),
759                )
760            }
761            protobuf::constraint::ConstraintMode::Unique(elem) => Constraint::Unique(
762                elem.indices.into_iter().map(|item| item as usize).collect(),
763            ),
764        }
765    }
766}
767
768impl From<&protobuf::ColumnStats> for ColumnStatistics {
769    fn from(cs: &protobuf::ColumnStats) -> ColumnStatistics {
770        ColumnStatistics {
771            null_count: if let Some(nc) = &cs.null_count {
772                nc.clone().into()
773            } else {
774                Precision::Absent
775            },
776            max_value: if let Some(max) = &cs.max_value {
777                max.clone().into()
778            } else {
779                Precision::Absent
780            },
781            min_value: if let Some(min) = &cs.min_value {
782                min.clone().into()
783            } else {
784                Precision::Absent
785            },
786            sum_value: if let Some(sum) = &cs.sum_value {
787                sum.clone().into()
788            } else {
789                Precision::Absent
790            },
791            distinct_count: if let Some(dc) = &cs.distinct_count {
792                dc.clone().into()
793            } else {
794                Precision::Absent
795            },
796            byte_size: if let Some(sbs) = &cs.byte_size {
797                sbs.clone().into()
798            } else {
799                Precision::Absent
800            },
801        }
802    }
803}
804
805impl From<protobuf::Precision> for Precision<usize> {
806    fn from(s: protobuf::Precision) -> Self {
807        let Ok(precision_type) = s.precision_info.try_into() else {
808            return Precision::Absent;
809        };
810        match precision_type {
811            protobuf::PrecisionInfo::Exact => {
812                if let Some(val) = s.val {
813                    if let Ok(ScalarValue::UInt64(Some(val))) =
814                        ScalarValue::try_from(&val)
815                    {
816                        Precision::Exact(val as usize)
817                    } else {
818                        Precision::Absent
819                    }
820                } else {
821                    Precision::Absent
822                }
823            }
824            protobuf::PrecisionInfo::Inexact => {
825                if let Some(val) = s.val {
826                    if let Ok(ScalarValue::UInt64(Some(val))) =
827                        ScalarValue::try_from(&val)
828                    {
829                        Precision::Inexact(val as usize)
830                    } else {
831                        Precision::Absent
832                    }
833                } else {
834                    Precision::Absent
835                }
836            }
837            protobuf::PrecisionInfo::Absent => Precision::Absent,
838        }
839    }
840}
841
842impl From<protobuf::Precision> for Precision<ScalarValue> {
843    fn from(s: protobuf::Precision) -> Self {
844        let Ok(precision_type) = s.precision_info.try_into() else {
845            return Precision::Absent;
846        };
847        match precision_type {
848            protobuf::PrecisionInfo::Exact => {
849                if let Some(val) = s.val {
850                    if let Ok(val) = ScalarValue::try_from(&val) {
851                        Precision::Exact(val)
852                    } else {
853                        Precision::Absent
854                    }
855                } else {
856                    Precision::Absent
857                }
858            }
859            protobuf::PrecisionInfo::Inexact => {
860                if let Some(val) = s.val {
861                    if let Ok(val) = ScalarValue::try_from(&val) {
862                        Precision::Inexact(val)
863                    } else {
864                        Precision::Absent
865                    }
866                } else {
867                    Precision::Absent
868                }
869            }
870            protobuf::PrecisionInfo::Absent => Precision::Absent,
871        }
872    }
873}
874
875impl From<protobuf::JoinSide> for JoinSide {
876    fn from(t: protobuf::JoinSide) -> Self {
877        match t {
878            protobuf::JoinSide::LeftSide => JoinSide::Left,
879            protobuf::JoinSide::RightSide => JoinSide::Right,
880            protobuf::JoinSide::None => JoinSide::None,
881        }
882    }
883}
884
885impl From<&protobuf::Constraint> for Constraint {
886    fn from(value: &protobuf::Constraint) -> Self {
887        match &value.constraint_mode {
888            Some(protobuf::constraint::ConstraintMode::PrimaryKey(elem)) => {
889                Constraint::PrimaryKey(
890                    elem.indices.iter().map(|&item| item as usize).collect(),
891                )
892            }
893            Some(protobuf::constraint::ConstraintMode::Unique(elem)) => {
894                Constraint::Unique(
895                    elem.indices.iter().map(|&item| item as usize).collect(),
896                )
897            }
898            None => panic!("constraint_mode not set"),
899        }
900    }
901}
902
903impl TryFrom<&protobuf::Constraints> for Constraints {
904    type Error = DataFusionError;
905
906    fn try_from(
907        constraints: &protobuf::Constraints,
908    ) -> datafusion_common::Result<Self, Self::Error> {
909        Ok(Constraints::new_unverified(
910            constraints
911                .constraints
912                .iter()
913                .map(|item| item.into())
914                .collect(),
915        ))
916    }
917}
918
919impl TryFrom<&protobuf::Statistics> for Statistics {
920    type Error = DataFusionError;
921
922    fn try_from(
923        s: &protobuf::Statistics,
924    ) -> datafusion_common::Result<Self, Self::Error> {
925        // Keep it sync with Statistics::to_proto
926        Ok(Statistics {
927            num_rows: if let Some(nr) = &s.num_rows {
928                nr.clone().into()
929            } else {
930                Precision::Absent
931            },
932            total_byte_size: if let Some(tbs) = &s.total_byte_size {
933                tbs.clone().into()
934            } else {
935                Precision::Absent
936            },
937            // No column statistic (None) is encoded with empty array
938            column_statistics: s.column_stats.iter().map(|s| s.into()).collect(),
939        })
940    }
941}
942
943impl From<protobuf::CompressionTypeVariant> for CompressionTypeVariant {
944    fn from(value: protobuf::CompressionTypeVariant) -> Self {
945        match value {
946            protobuf::CompressionTypeVariant::Gzip => Self::GZIP,
947            protobuf::CompressionTypeVariant::Bzip2 => Self::BZIP2,
948            protobuf::CompressionTypeVariant::Xz => Self::XZ,
949            protobuf::CompressionTypeVariant::Zstd => Self::ZSTD,
950            protobuf::CompressionTypeVariant::Uncompressed => Self::UNCOMPRESSED,
951        }
952    }
953}
954
955impl From<CompressionTypeVariant> for protobuf::CompressionTypeVariant {
956    fn from(value: CompressionTypeVariant) -> Self {
957        match value {
958            CompressionTypeVariant::GZIP => Self::Gzip,
959            CompressionTypeVariant::BZIP2 => Self::Bzip2,
960            CompressionTypeVariant::XZ => Self::Xz,
961            CompressionTypeVariant::ZSTD => Self::Zstd,
962            CompressionTypeVariant::UNCOMPRESSED => Self::Uncompressed,
963        }
964    }
965}
966
967impl From<protobuf::CsvQuoteStyle> for datafusion_common::parsers::CsvQuoteStyle {
968    fn from(value: protobuf::CsvQuoteStyle) -> Self {
969        match value {
970            protobuf::CsvQuoteStyle::Necessary => Self::Necessary,
971            protobuf::CsvQuoteStyle::Always => Self::Always,
972            protobuf::CsvQuoteStyle::NonNumeric => Self::NonNumeric,
973            protobuf::CsvQuoteStyle::Never => Self::Never,
974        }
975    }
976}
977
978impl TryFrom<&protobuf::CsvWriterOptions> for CsvWriterOptions {
979    type Error = DataFusionError;
980
981    fn try_from(
982        opts: &protobuf::CsvWriterOptions,
983    ) -> datafusion_common::Result<Self, Self::Error> {
984        let write_options = csv_writer_options_from_proto(opts)?;
985        let compression: CompressionTypeVariant = opts.compression().into();
986        Ok(CsvWriterOptions::new(write_options, compression))
987    }
988}
989
990impl TryFrom<&protobuf::JsonWriterOptions> for JsonWriterOptions {
991    type Error = DataFusionError;
992
993    fn try_from(
994        opts: &protobuf::JsonWriterOptions,
995    ) -> datafusion_common::Result<Self, Self::Error> {
996        let compression: CompressionTypeVariant = opts.compression().into();
997        Ok(JsonWriterOptions::new(compression))
998    }
999}
1000
1001impl TryFrom<&protobuf::CsvOptions> for CsvOptions {
1002    type Error = DataFusionError;
1003
1004    fn try_from(
1005        proto_opts: &protobuf::CsvOptions,
1006    ) -> datafusion_common::Result<Self, Self::Error> {
1007        Ok(CsvOptions {
1008            has_header: proto_opts.has_header.first().map(|h| *h != 0),
1009            delimiter: proto_opts.delimiter[0],
1010            quote: proto_opts.quote[0],
1011            terminator: proto_opts.terminator.first().copied(),
1012            escape: proto_opts.escape.first().copied(),
1013            double_quote: proto_opts.double_quote.first().map(|h| *h != 0),
1014            newlines_in_values: proto_opts.newlines_in_values.first().map(|h| *h != 0),
1015            compression: proto_opts.compression().into(),
1016            compression_level: proto_opts.compression_level,
1017            schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize),
1018            date_format: (!proto_opts.date_format.is_empty())
1019                .then(|| proto_opts.date_format.clone()),
1020            datetime_format: (!proto_opts.datetime_format.is_empty())
1021                .then(|| proto_opts.datetime_format.clone()),
1022            timestamp_format: (!proto_opts.timestamp_format.is_empty())
1023                .then(|| proto_opts.timestamp_format.clone()),
1024            timestamp_tz_format: (!proto_opts.timestamp_tz_format.is_empty())
1025                .then(|| proto_opts.timestamp_tz_format.clone()),
1026            time_format: (!proto_opts.time_format.is_empty())
1027                .then(|| proto_opts.time_format.clone()),
1028            null_value: (!proto_opts.null_value.is_empty())
1029                .then(|| proto_opts.null_value.clone()),
1030            null_regex: (!proto_opts.null_regex.is_empty())
1031                .then(|| proto_opts.null_regex.clone()),
1032            comment: proto_opts.comment.first().copied(),
1033            truncated_rows: proto_opts.truncated_rows.first().map(|h| *h != 0),
1034            quote_style: proto_opts.quote_style().into(),
1035            ignore_leading_whitespace: proto_opts
1036                .ignore_leading_whitespace
1037                .first()
1038                .map(|h| *h != 0),
1039            ignore_trailing_whitespace: proto_opts
1040                .ignore_trailing_whitespace
1041                .first()
1042                .map(|h| *h != 0),
1043        })
1044    }
1045}
1046
1047impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
1048    type Error = DataFusionError;
1049
1050    fn try_from(
1051        value: &protobuf::ParquetOptions,
1052    ) -> datafusion_common::Result<Self, Self::Error> {
1053        Ok(ParquetOptions {
1054            enable_page_index: value.enable_page_index,
1055            pruning: value.pruning,
1056            skip_metadata: value.skip_metadata,
1057            metadata_size_hint: value
1058                .metadata_size_hint_opt
1059                .map(|opt| match opt {
1060                    protobuf::parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v) => Some(v as usize),
1061                })
1062                .unwrap_or(None),
1063            pushdown_filters: value.pushdown_filters,
1064            reorder_filters: value.reorder_filters,
1065            force_filter_selections: value.force_filter_selections,
1066            data_pagesize_limit: value.data_pagesize_limit as usize,
1067            write_batch_size: value.write_batch_size as usize,
1068            writer_version: value.writer_version.parse().map_err(|e| {
1069                DataFusionError::Internal(format!("Failed to parse writer_version: {e}"))
1070            })?,
1071            compression: value.compression_opt.clone().map(|opt| match opt {
1072                protobuf::parquet_options::CompressionOpt::Compression(v) => Some(v),
1073            }).unwrap_or(None),
1074            dictionary_enabled: value.dictionary_enabled_opt.as_ref().map(|protobuf::parquet_options::DictionaryEnabledOpt::DictionaryEnabled(v)| *v),
1075            // Continuing from where we left off in the TryFrom implementation
1076            dictionary_page_size_limit: value.dictionary_page_size_limit as usize,
1077            statistics_enabled: value
1078                .statistics_enabled_opt.clone()
1079                .map(|opt| match opt {
1080                    protobuf::parquet_options::StatisticsEnabledOpt::StatisticsEnabled(v) => Some(v),
1081                })
1082                .unwrap_or(None),
1083            max_row_group_size: value.max_row_group_size as usize,
1084            created_by: value.created_by.clone(),
1085            column_index_truncate_length: value
1086                .column_index_truncate_length_opt.as_ref()
1087                .map(|opt| match opt {
1088                    protobuf::parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v) => Some(*v as usize),
1089                })
1090                .unwrap_or(None),
1091            statistics_truncate_length: value
1092                .statistics_truncate_length_opt.as_ref()
1093                .map(|opt| match opt {
1094                    protobuf::parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v) => Some(*v as usize),
1095                })
1096                .unwrap_or(None),
1097            data_page_row_count_limit: value.data_page_row_count_limit as usize,
1098            encoding: value
1099                .encoding_opt.clone()
1100                .map(|opt| match opt {
1101                    protobuf::parquet_options::EncodingOpt::Encoding(v) => Some(v),
1102                })
1103                .unwrap_or(None),
1104            bloom_filter_on_read: value.bloom_filter_on_read,
1105            bloom_filter_on_write: value.bloom_filter_on_write,
1106            bloom_filter_fpp: value.clone()
1107                .bloom_filter_fpp_opt
1108                .map(|opt| match opt {
1109                    protobuf::parquet_options::BloomFilterFppOpt::BloomFilterFpp(v) => Some(v),
1110                })
1111                .unwrap_or(None),
1112            bloom_filter_ndv: value.clone()
1113                .bloom_filter_ndv_opt
1114                .map(|opt| match opt {
1115                    protobuf::parquet_options::BloomFilterNdvOpt::BloomFilterNdv(v) => Some(v),
1116                })
1117                .unwrap_or(None),
1118            allow_single_file_parallelism: value.allow_single_file_parallelism,
1119            maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as usize,
1120            maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as usize,
1121            schema_force_view_types: value.schema_force_view_types,
1122            binary_as_string: value.binary_as_string,
1123            coerce_int96: value.coerce_int96_opt.clone().map(|opt| match opt {
1124                protobuf::parquet_options::CoerceInt96Opt::CoerceInt96(v) => Some(v),
1125            }).unwrap_or(None),
1126            coerce_int96_tz: value.coerce_int96_tz_opt.clone().map(|opt| match opt {
1127                protobuf::parquet_options::CoerceInt96TzOpt::CoerceInt96Tz(v) => Some(v),
1128            }).unwrap_or(None),
1129            skip_arrow_metadata: value.skip_arrow_metadata,
1130            max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt {
1131                protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize),
1132            }).unwrap_or(None),
1133            content_defined_chunking: value.content_defined_chunking.map(ParquetCdcOptions::from).unwrap_or_default(),
1134        })
1135    }
1136}
1137
1138impl From<protobuf::ParquetCdcOptions> for ParquetCdcOptions {
1139    fn from(value: protobuf::ParquetCdcOptions) -> Self {
1140        ParquetCdcOptions {
1141            enabled: value.enabled,
1142            min_chunk_size: value.min_chunk_size as usize,
1143            max_chunk_size: value.max_chunk_size as usize,
1144            norm_level: value.norm_level,
1145        }
1146    }
1147}
1148
1149impl TryFrom<&protobuf::ParquetColumnOptions> for ParquetColumnOptions {
1150    type Error = DataFusionError;
1151    fn try_from(
1152        value: &protobuf::ParquetColumnOptions,
1153    ) -> datafusion_common::Result<Self, Self::Error> {
1154        Ok(ParquetColumnOptions {
1155            compression: value.compression_opt.clone().map(|opt| match opt {
1156                protobuf::parquet_column_options::CompressionOpt::Compression(v) => Some(v),
1157            }).unwrap_or(None),
1158            dictionary_enabled: value.dictionary_enabled_opt.as_ref().map(|protobuf::parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(v)| *v),
1159            statistics_enabled: value
1160                .statistics_enabled_opt.clone()
1161                .map(|opt| match opt {
1162                    protobuf::parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(v) => Some(v),
1163                })
1164                .unwrap_or(None),
1165            encoding: value
1166                .encoding_opt.clone()
1167                .map(|opt| match opt {
1168                    protobuf::parquet_column_options::EncodingOpt::Encoding(v) => Some(v),
1169                })
1170                .unwrap_or(None),
1171            bloom_filter_enabled: value.bloom_filter_enabled_opt.map(|opt| match opt {
1172                protobuf::parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v) => Some(v),
1173            })
1174                .unwrap_or(None),
1175            bloom_filter_fpp: value
1176                .bloom_filter_fpp_opt
1177                .map(|opt| match opt {
1178                    protobuf::parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v) => Some(v),
1179                })
1180                .unwrap_or(None),
1181            bloom_filter_ndv: value
1182                .bloom_filter_ndv_opt
1183                .map(|opt| match opt {
1184                    protobuf::parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v) => Some(v),
1185                })
1186                .unwrap_or(None),
1187        })
1188    }
1189}
1190
1191impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions {
1192    type Error = DataFusionError;
1193    fn try_from(
1194        value: &protobuf::TableParquetOptions,
1195    ) -> datafusion_common::Result<Self, Self::Error> {
1196        let mut column_specific_options: HashMap<String, ParquetColumnOptions> =
1197            HashMap::new();
1198        for protobuf::ParquetColumnSpecificOptions {
1199            column_name,
1200            options: maybe_options,
1201        } in &value.column_specific_options
1202        {
1203            if let Some(options) = maybe_options {
1204                column_specific_options.insert(column_name.clone(), options.try_into()?);
1205            }
1206        }
1207        let opts = TableParquetOptions {
1208            global: value
1209                .global
1210                .as_ref()
1211                .map(|v| v.try_into())
1212                .unwrap()
1213                .unwrap(),
1214            column_specific_options,
1215            ..Default::default()
1216        };
1217        Ok(opts)
1218    }
1219}
1220
1221impl TryFrom<&protobuf::JsonOptions> for JsonOptions {
1222    type Error = DataFusionError;
1223
1224    fn try_from(
1225        proto_opts: &protobuf::JsonOptions,
1226    ) -> datafusion_common::Result<Self, Self::Error> {
1227        let compression: protobuf::CompressionTypeVariant = proto_opts.compression();
1228        Ok(JsonOptions {
1229            compression: compression.into(),
1230            compression_level: proto_opts.compression_level,
1231            schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize),
1232            newline_delimited: proto_opts.newline_delimited.unwrap_or(true),
1233        })
1234    }
1235}
1236
1237pub fn parse_i32_to_time_unit(value: &i32) -> datafusion_common::Result<TimeUnit, Error> {
1238    protobuf::TimeUnit::try_from(*value)
1239        .map(|t| t.into())
1240        .map_err(|_| Error::unknown("TimeUnit", *value))
1241}
1242
1243pub fn parse_i32_to_interval_unit(
1244    value: &i32,
1245) -> datafusion_common::Result<IntervalUnit, Error> {
1246    protobuf::IntervalUnit::try_from(*value)
1247        .map(|t| t.into())
1248        .map_err(|_| Error::unknown("IntervalUnit", *value))
1249}
1250
1251// panic here because no better way to convert from Vec to Array
1252fn vec_to_array<T, const N: usize>(v: Vec<T>) -> [T; N] {
1253    v.try_into().unwrap_or_else(|v: Vec<T>| {
1254        panic!("Expected a Vec of length {} but it was {}", N, v.len())
1255    })
1256}
1257
1258/// Converts a vector of `protobuf::Field`s to `Arc<arrow::Field>`s.
1259pub fn parse_proto_fields_to_fields<'a, I>(
1260    fields: I,
1261) -> std::result::Result<Vec<Field>, Error>
1262where
1263    I: IntoIterator<Item = &'a protobuf::Field>,
1264{
1265    fields
1266        .into_iter()
1267        .map(Field::try_from)
1268        .collect::<datafusion_common::Result<_, _>>()
1269}
1270
1271pub(crate) fn csv_writer_options_from_proto(
1272    writer_options: &protobuf::CsvWriterOptions,
1273) -> datafusion_common::Result<WriterBuilder> {
1274    let mut builder = WriterBuilder::new();
1275    if !writer_options.delimiter.is_empty() {
1276        if let Some(delimiter) = writer_options.delimiter.chars().next() {
1277            if delimiter.is_ascii() {
1278                builder = builder.with_delimiter(delimiter as u8);
1279            } else {
1280                return Err(proto_error("CSV Delimiter is not ASCII"));
1281            }
1282        } else {
1283            return Err(proto_error("Error parsing CSV Delimiter"));
1284        }
1285    }
1286    if !writer_options.quote.is_empty() {
1287        if let Some(quote) = writer_options.quote.chars().next() {
1288            if quote.is_ascii() {
1289                builder = builder.with_quote(quote as u8);
1290            } else {
1291                return Err(proto_error("CSV Quote is not ASCII"));
1292            }
1293        } else {
1294            return Err(proto_error("Error parsing CSV Quote"));
1295        }
1296    }
1297    if !writer_options.escape.is_empty() {
1298        if let Some(escape) = writer_options.escape.chars().next() {
1299            if escape.is_ascii() {
1300                builder = builder.with_escape(escape as u8);
1301            } else {
1302                return Err(proto_error("CSV Escape is not ASCII"));
1303            }
1304        } else {
1305            return Err(proto_error("Error parsing CSV Escape"));
1306        }
1307    }
1308    let quote_style = match protobuf::CsvQuoteStyle::try_from(writer_options.quote_style)
1309    {
1310        Ok(protobuf::CsvQuoteStyle::Always) => QuoteStyle::Always,
1311        Ok(protobuf::CsvQuoteStyle::NonNumeric) => QuoteStyle::NonNumeric,
1312        Ok(protobuf::CsvQuoteStyle::Never) => QuoteStyle::Never,
1313        Ok(protobuf::CsvQuoteStyle::Necessary) => QuoteStyle::Necessary,
1314        _ => Err(proto_error(
1315            "Unknown quote style, must be one of: 'Always', 'NonNumeric', 'Never', 'Necessary'",
1316        ))?,
1317    };
1318    Ok(builder
1319        .with_header(writer_options.has_header)
1320        .with_date_format(writer_options.date_format.clone())
1321        .with_datetime_format(writer_options.datetime_format.clone())
1322        .with_timestamp_format(writer_options.timestamp_format.clone())
1323        .with_time_format(writer_options.time_format.clone())
1324        .with_null(writer_options.null_value.clone())
1325        .with_double_quote(writer_options.double_quote)
1326        .with_quote_style(quote_style)
1327        .with_ignore_leading_whitespace(writer_options.ignore_leading_whitespace)
1328        .with_ignore_trailing_whitespace(writer_options.ignore_trailing_whitespace))
1329}
1330
1331#[cfg(test)]
1332mod tests {
1333    use datafusion_common::config::{
1334        ParquetCdcOptions, ParquetOptions, TableParquetOptions,
1335    };
1336
1337    fn parquet_options_proto_round_trip(opts: ParquetOptions) -> ParquetOptions {
1338        let proto: crate::protobuf_common::ParquetOptions =
1339            (&opts).try_into().expect("to_proto");
1340        ParquetOptions::try_from(&proto).expect("from_proto")
1341    }
1342
1343    fn table_parquet_options_proto_round_trip(
1344        opts: TableParquetOptions,
1345    ) -> TableParquetOptions {
1346        let proto: crate::protobuf_common::TableParquetOptions =
1347            (&opts).try_into().expect("to_proto");
1348        TableParquetOptions::try_from(&proto).expect("from_proto")
1349    }
1350
1351    #[test]
1352    fn test_parquet_options_cdc_disabled_round_trip() {
1353        let opts = ParquetOptions::default();
1354        assert!(!opts.content_defined_chunking.enabled);
1355        let recovered = parquet_options_proto_round_trip(opts.clone());
1356        assert_eq!(opts, recovered);
1357    }
1358
1359    #[test]
1360    fn test_parquet_options_coerce_int96_tz_unset_round_trip() {
1361        let opts = ParquetOptions::default();
1362        assert!(opts.coerce_int96_tz.is_none());
1363        let recovered = parquet_options_proto_round_trip(opts.clone());
1364        assert_eq!(recovered.coerce_int96_tz, None);
1365    }
1366
1367    #[test]
1368    fn test_parquet_options_coerce_int96_tz_set_round_trip() {
1369        let opts = ParquetOptions {
1370            coerce_int96: Some("us".to_string()),
1371            coerce_int96_tz: Some("UTC".to_string()),
1372            ..ParquetOptions::default()
1373        };
1374        let recovered = parquet_options_proto_round_trip(opts.clone());
1375        assert_eq!(recovered.coerce_int96, Some("us".to_string()));
1376        assert_eq!(recovered.coerce_int96_tz, Some("UTC".to_string()));
1377    }
1378
1379    #[test]
1380    fn test_table_parquet_options_coerce_int96_tz_round_trip() {
1381        let mut opts = TableParquetOptions::default();
1382        opts.global.coerce_int96 = Some("us".to_string());
1383        opts.global.coerce_int96_tz = Some("America/Los_Angeles".to_string());
1384        let recovered = table_parquet_options_proto_round_trip(opts.clone());
1385        assert_eq!(recovered.global.coerce_int96, Some("us".to_string()));
1386        assert_eq!(
1387            recovered.global.coerce_int96_tz,
1388            Some("America/Los_Angeles".to_string())
1389        );
1390    }
1391
1392    #[test]
1393    fn test_parquet_options_cdc_enabled_round_trip() {
1394        let opts = ParquetOptions {
1395            content_defined_chunking: ParquetCdcOptions {
1396                enabled: true,
1397                min_chunk_size: 128 * 1024,
1398                max_chunk_size: 512 * 1024,
1399                norm_level: 2,
1400            },
1401            ..ParquetOptions::default()
1402        };
1403        let recovered = parquet_options_proto_round_trip(opts.clone());
1404        let cdc = recovered.content_defined_chunking;
1405        assert!(cdc.enabled);
1406        assert_eq!(cdc.min_chunk_size, 128 * 1024);
1407        assert_eq!(cdc.max_chunk_size, 512 * 1024);
1408        assert_eq!(cdc.norm_level, 2);
1409    }
1410
1411    #[test]
1412    fn test_parquet_options_cdc_negative_norm_level_round_trip() {
1413        let opts = ParquetOptions {
1414            content_defined_chunking: ParquetCdcOptions {
1415                enabled: true,
1416                norm_level: -3,
1417                ..ParquetCdcOptions::default()
1418            },
1419            ..ParquetOptions::default()
1420        };
1421        let recovered = parquet_options_proto_round_trip(opts);
1422        assert_eq!(recovered.content_defined_chunking.norm_level, -3);
1423    }
1424
1425    #[test]
1426    fn test_table_parquet_options_cdc_round_trip() {
1427        let mut opts = TableParquetOptions::default();
1428        opts.global.content_defined_chunking = ParquetCdcOptions {
1429            enabled: true,
1430            min_chunk_size: 64 * 1024,
1431            max_chunk_size: 2 * 1024 * 1024,
1432            norm_level: -1,
1433        };
1434
1435        let recovered = table_parquet_options_proto_round_trip(opts.clone());
1436        let cdc = recovered.global.content_defined_chunking;
1437        assert!(cdc.enabled);
1438        assert_eq!(cdc.min_chunk_size, 64 * 1024);
1439        assert_eq!(cdc.max_chunk_size, 2 * 1024 * 1024);
1440        assert_eq!(cdc.norm_level, -1);
1441    }
1442
1443    #[test]
1444    fn test_table_parquet_options_cdc_disabled_round_trip() {
1445        let opts = TableParquetOptions::default();
1446        assert!(!opts.global.content_defined_chunking.enabled);
1447        let recovered = table_parquet_options_proto_round_trip(opts.clone());
1448        assert!(!recovered.global.content_defined_chunking.enabled);
1449    }
1450}