Skip to main content

datafusion_proto_common/from_proto/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::convert::{TryFrom, TryInto};
20use std::sync::Arc;
21
22use crate::common::proto_error;
23use crate::protobuf_common as protobuf;
24use arrow::array::{ArrayRef, AsArray};
25use arrow::buffer::Buffer;
26use arrow::csv::WriterBuilder;
27use arrow::datatypes::{
28    DataType, Field, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, Schema,
29    TimeUnit, UnionFields, UnionMode, i256,
30};
31use arrow::ipc::{
32    convert::fb_to_schema,
33    reader::{read_dictionary, read_record_batch},
34    root_as_message,
35    writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions},
36};
37
38use datafusion_common::{
39    Column, ColumnStatistics, Constraint, Constraints, DFSchema, DFSchemaRef,
40    DataFusionError, JoinSide, ScalarValue, Statistics, TableReference,
41    arrow_datafusion_err,
42    config::{
43        CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
44        TableParquetOptions,
45    },
46    file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
47    parsers::CompressionTypeVariant,
48    plan_datafusion_err,
49    stats::Precision,
50};
51
52#[derive(Debug)]
53pub enum Error {
54    General(String),
55
56    DataFusionError(DataFusionError),
57
58    MissingRequiredField(String),
59
60    AtLeastOneValue(String),
61
62    UnknownEnumVariant { name: String, value: i32 },
63}
64
65impl std::fmt::Display for Error {
66    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
67        match self {
68            Self::General(desc) => write!(f, "General error: {desc}"),
69
70            Self::DataFusionError(desc) => {
71                write!(f, "DataFusion error: {desc:?}")
72            }
73
74            Self::MissingRequiredField(name) => {
75                write!(f, "Missing required field {name}")
76            }
77            Self::AtLeastOneValue(name) => {
78                write!(f, "Must have at least one {name}, found 0")
79            }
80            Self::UnknownEnumVariant { name, value } => {
81                write!(f, "Unknown i32 value for {name} enum: {value}")
82            }
83        }
84    }
85}
86
87impl std::error::Error for Error {}
88
89impl From<DataFusionError> for Error {
90    fn from(e: DataFusionError) -> Self {
91        Error::DataFusionError(e)
92    }
93}
94
95impl Error {
96    pub fn required(field: impl Into<String>) -> Error {
97        Error::MissingRequiredField(field.into())
98    }
99
100    pub fn unknown(name: impl Into<String>, value: i32) -> Error {
101        Error::UnknownEnumVariant {
102            name: name.into(),
103            value,
104        }
105    }
106}
107
108impl From<Error> for DataFusionError {
109    fn from(e: Error) -> Self {
110        plan_datafusion_err!("{}", e)
111    }
112}
113
114/// An extension trait that adds the methods `optional` and `required` to any
115/// Option containing a type implementing `TryInto<U, Error = Error>`
116pub trait FromOptionalField<T> {
117    /// Converts an optional protobuf field to an option of a different type
118    ///
119    /// Returns None if the option is None, otherwise calls [`TryInto::try_into`]
120    /// on the contained data, returning any error encountered
121    fn optional(self) -> datafusion_common::Result<Option<T>, Error>;
122
123    /// Converts an optional protobuf field to a different type, returning an error if None
124    ///
125    /// Returns `Error::MissingRequiredField` if None, otherwise calls [`TryInto::try_into`]
126    /// on the contained data, returning any error encountered
127    fn required(self, field: impl Into<String>) -> datafusion_common::Result<T, Error>;
128}
129
130impl<T, U> FromOptionalField<U> for Option<T>
131where
132    T: TryInto<U, Error = Error>,
133{
134    fn optional(self) -> datafusion_common::Result<Option<U>, Error> {
135        self.map(|t| t.try_into()).transpose()
136    }
137
138    fn required(self, field: impl Into<String>) -> datafusion_common::Result<U, Error> {
139        match self {
140            None => Err(Error::required(field)),
141            Some(t) => t.try_into(),
142        }
143    }
144}
145
146impl From<protobuf::ColumnRelation> for TableReference {
147    fn from(rel: protobuf::ColumnRelation) -> Self {
148        Self::parse_str_normalized(rel.relation.as_str(), true)
149    }
150}
151
152impl From<protobuf::Column> for Column {
153    fn from(c: protobuf::Column) -> Self {
154        let protobuf::Column { relation, name } = c;
155
156        Self::new(relation, name)
157    }
158}
159
160impl From<&protobuf::Column> for Column {
161    fn from(c: &protobuf::Column) -> Self {
162        c.clone().into()
163    }
164}
165
166impl TryFrom<&protobuf::DfSchema> for DFSchema {
167    type Error = Error;
168
169    fn try_from(
170        df_schema: &protobuf::DfSchema,
171    ) -> datafusion_common::Result<Self, Self::Error> {
172        let df_fields = df_schema.columns.clone();
173        let qualifiers_and_fields: Vec<(Option<TableReference>, Arc<Field>)> = df_fields
174            .iter()
175            .map(|df_field| {
176                let field: Field = df_field.field.as_ref().required("field")?;
177                Ok((
178                    df_field.qualifier.as_ref().map(|q| q.clone().into()),
179                    Arc::new(field),
180                ))
181            })
182            .collect::<datafusion_common::Result<Vec<_>, Error>>()?;
183
184        Ok(DFSchema::new_with_metadata(
185            qualifiers_and_fields,
186            df_schema.metadata.clone(),
187        )?)
188    }
189}
190
191impl TryFrom<protobuf::DfSchema> for DFSchemaRef {
192    type Error = Error;
193
194    fn try_from(
195        df_schema: protobuf::DfSchema,
196    ) -> datafusion_common::Result<Self, Self::Error> {
197        let dfschema: DFSchema = (&df_schema).try_into()?;
198        Ok(Arc::new(dfschema))
199    }
200}
201
202impl TryFrom<&protobuf::ArrowType> for DataType {
203    type Error = Error;
204
205    fn try_from(
206        arrow_type: &protobuf::ArrowType,
207    ) -> datafusion_common::Result<Self, Self::Error> {
208        arrow_type
209            .arrow_type_enum
210            .as_ref()
211            .required("arrow_type_enum")
212    }
213}
214
215impl TryFrom<&protobuf::arrow_type::ArrowTypeEnum> for DataType {
216    type Error = Error;
217    fn try_from(
218        arrow_type_enum: &protobuf::arrow_type::ArrowTypeEnum,
219    ) -> datafusion_common::Result<Self, Self::Error> {
220        use protobuf::arrow_type;
221        Ok(match arrow_type_enum {
222            arrow_type::ArrowTypeEnum::None(_) => DataType::Null,
223            arrow_type::ArrowTypeEnum::Bool(_) => DataType::Boolean,
224            arrow_type::ArrowTypeEnum::Uint8(_) => DataType::UInt8,
225            arrow_type::ArrowTypeEnum::Int8(_) => DataType::Int8,
226            arrow_type::ArrowTypeEnum::Uint16(_) => DataType::UInt16,
227            arrow_type::ArrowTypeEnum::Int16(_) => DataType::Int16,
228            arrow_type::ArrowTypeEnum::Uint32(_) => DataType::UInt32,
229            arrow_type::ArrowTypeEnum::Int32(_) => DataType::Int32,
230            arrow_type::ArrowTypeEnum::Uint64(_) => DataType::UInt64,
231            arrow_type::ArrowTypeEnum::Int64(_) => DataType::Int64,
232            arrow_type::ArrowTypeEnum::Float16(_) => DataType::Float16,
233            arrow_type::ArrowTypeEnum::Float32(_) => DataType::Float32,
234            arrow_type::ArrowTypeEnum::Float64(_) => DataType::Float64,
235            arrow_type::ArrowTypeEnum::Utf8(_) => DataType::Utf8,
236            arrow_type::ArrowTypeEnum::Utf8View(_) => DataType::Utf8View,
237            arrow_type::ArrowTypeEnum::LargeUtf8(_) => DataType::LargeUtf8,
238            arrow_type::ArrowTypeEnum::Binary(_) => DataType::Binary,
239            arrow_type::ArrowTypeEnum::BinaryView(_) => DataType::BinaryView,
240            arrow_type::ArrowTypeEnum::FixedSizeBinary(size) => {
241                DataType::FixedSizeBinary(*size)
242            }
243            arrow_type::ArrowTypeEnum::LargeBinary(_) => DataType::LargeBinary,
244            arrow_type::ArrowTypeEnum::Date32(_) => DataType::Date32,
245            arrow_type::ArrowTypeEnum::Date64(_) => DataType::Date64,
246            arrow_type::ArrowTypeEnum::Duration(time_unit) => {
247                DataType::Duration(parse_i32_to_time_unit(time_unit)?)
248            }
249            arrow_type::ArrowTypeEnum::Timestamp(protobuf::Timestamp {
250                time_unit,
251                timezone,
252            }) => DataType::Timestamp(
253                parse_i32_to_time_unit(time_unit)?,
254                match timezone.len() {
255                    0 => None,
256                    _ => Some(timezone.as_str().into()),
257                },
258            ),
259            arrow_type::ArrowTypeEnum::Time32(time_unit) => {
260                DataType::Time32(parse_i32_to_time_unit(time_unit)?)
261            }
262            arrow_type::ArrowTypeEnum::Time64(time_unit) => {
263                DataType::Time64(parse_i32_to_time_unit(time_unit)?)
264            }
265            arrow_type::ArrowTypeEnum::Interval(interval_unit) => {
266                DataType::Interval(parse_i32_to_interval_unit(interval_unit)?)
267            }
268            arrow_type::ArrowTypeEnum::Decimal32(protobuf::Decimal32Type {
269                precision,
270                scale,
271            }) => DataType::Decimal32(*precision as u8, *scale as i8),
272            arrow_type::ArrowTypeEnum::Decimal64(protobuf::Decimal64Type {
273                precision,
274                scale,
275            }) => DataType::Decimal64(*precision as u8, *scale as i8),
276            arrow_type::ArrowTypeEnum::Decimal128(protobuf::Decimal128Type {
277                precision,
278                scale,
279            }) => DataType::Decimal128(*precision as u8, *scale as i8),
280            arrow_type::ArrowTypeEnum::Decimal256(protobuf::Decimal256Type {
281                precision,
282                scale,
283            }) => DataType::Decimal256(*precision as u8, *scale as i8),
284            arrow_type::ArrowTypeEnum::List(list) => {
285                let list_type =
286                    list.as_ref().field_type.as_deref().required("field_type")?;
287                DataType::List(Arc::new(list_type))
288            }
289            arrow_type::ArrowTypeEnum::LargeList(list) => {
290                let list_type =
291                    list.as_ref().field_type.as_deref().required("field_type")?;
292                DataType::LargeList(Arc::new(list_type))
293            }
294            arrow_type::ArrowTypeEnum::FixedSizeList(list) => {
295                let list_type =
296                    list.as_ref().field_type.as_deref().required("field_type")?;
297                let list_size = list.list_size;
298                DataType::FixedSizeList(Arc::new(list_type), list_size)
299            }
300            arrow_type::ArrowTypeEnum::Struct(strct) => DataType::Struct(
301                parse_proto_fields_to_fields(&strct.sub_field_types)?.into(),
302            ),
303            arrow_type::ArrowTypeEnum::Union(union) => {
304                let union_mode = protobuf::UnionMode::try_from(union.union_mode)
305                    .map_err(|_| Error::unknown("UnionMode", union.union_mode))?;
306                let union_mode = match union_mode {
307                    protobuf::UnionMode::Dense => UnionMode::Dense,
308                    protobuf::UnionMode::Sparse => UnionMode::Sparse,
309                };
310                let union_fields = parse_proto_fields_to_fields(&union.union_types)?;
311
312                // Default to index based type ids if not explicitly provided
313                let union_fields = if union.type_ids.is_empty() {
314                    UnionFields::from_fields(union_fields)
315                } else {
316                    let type_ids = union.type_ids.iter().map(|i| *i as i8);
317                    UnionFields::try_new(type_ids, union_fields).map_err(|e| {
318                        DataFusionError::from(e).context("Deserializing Union DataType")
319                    })?
320                };
321                DataType::Union(union_fields, union_mode)
322            }
323            arrow_type::ArrowTypeEnum::Dictionary(dict) => {
324                let key_datatype = dict.as_ref().key.as_deref().required("key")?;
325                let value_datatype = dict.as_ref().value.as_deref().required("value")?;
326                DataType::Dictionary(Box::new(key_datatype), Box::new(value_datatype))
327            }
328            arrow_type::ArrowTypeEnum::Map(map) => {
329                let field: Field =
330                    map.as_ref().field_type.as_deref().required("field_type")?;
331                let keys_sorted = map.keys_sorted;
332                DataType::Map(Arc::new(field), keys_sorted)
333            }
334            arrow_type::ArrowTypeEnum::RunEndEncoded(run_end_encoded) => {
335                let run_ends_field: Field = run_end_encoded
336                    .as_ref()
337                    .run_ends_field
338                    .as_deref()
339                    .required("run_ends_field")?;
340                let value_field: Field = run_end_encoded
341                    .as_ref()
342                    .values_field
343                    .as_deref()
344                    .required("values_field")?;
345                DataType::RunEndEncoded(run_ends_field.into(), value_field.into())
346            }
347        })
348    }
349}
350
351impl TryFrom<&protobuf::Field> for Field {
352    type Error = Error;
353    fn try_from(field: &protobuf::Field) -> Result<Self, Self::Error> {
354        let datatype = field.arrow_type.as_deref().required("arrow_type")?;
355        let field = Self::new(field.name.as_str(), datatype, field.nullable)
356            .with_metadata(field.metadata.clone());
357        Ok(field)
358    }
359}
360
361impl TryFrom<&protobuf::Schema> for Schema {
362    type Error = Error;
363
364    fn try_from(
365        schema: &protobuf::Schema,
366    ) -> datafusion_common::Result<Self, Self::Error> {
367        let fields = schema
368            .columns
369            .iter()
370            .map(Field::try_from)
371            .collect::<datafusion_common::Result<Vec<_>, _>>()?;
372        Ok(Self::new_with_metadata(fields, schema.metadata.clone()))
373    }
374}
375
376impl TryFrom<&protobuf::ScalarValue> for ScalarValue {
377    type Error = Error;
378
379    fn try_from(
380        scalar: &protobuf::ScalarValue,
381    ) -> datafusion_common::Result<Self, Self::Error> {
382        use protobuf::scalar_value::Value;
383
384        let value = scalar
385            .value
386            .as_ref()
387            .ok_or_else(|| Error::required("value"))?;
388
389        Ok(match value {
390            Value::BoolValue(v) => Self::Boolean(Some(*v)),
391            Value::Utf8Value(v) => Self::Utf8(Some(v.to_owned())),
392            Value::Utf8ViewValue(v) => Self::Utf8View(Some(v.to_owned())),
393            Value::LargeUtf8Value(v) => Self::LargeUtf8(Some(v.to_owned())),
394            Value::Int8Value(v) => Self::Int8(Some(*v as i8)),
395            Value::Int16Value(v) => Self::Int16(Some(*v as i16)),
396            Value::Int32Value(v) => Self::Int32(Some(*v)),
397            Value::Int64Value(v) => Self::Int64(Some(*v)),
398            Value::Uint8Value(v) => Self::UInt8(Some(*v as u8)),
399            Value::Uint16Value(v) => Self::UInt16(Some(*v as u16)),
400            Value::Uint32Value(v) => Self::UInt32(Some(*v)),
401            Value::Uint64Value(v) => Self::UInt64(Some(*v)),
402            Value::Float32Value(v) => Self::Float32(Some(*v)),
403            Value::Float64Value(v) => Self::Float64(Some(*v)),
404            Value::Date32Value(v) => Self::Date32(Some(*v)),
405            // Nested ScalarValue types are serialized using arrow IPC format
406            Value::ListValue(v)
407            | Value::FixedSizeListValue(v)
408            | Value::LargeListValue(v)
409            | Value::StructValue(v)
410            | Value::MapValue(v) => {
411                let protobuf::ScalarNestedValue {
412                    ipc_message,
413                    arrow_data,
414                    dictionaries,
415                    schema,
416                } = &v;
417
418                let schema: Schema = if let Some(schema_ref) = schema {
419                    schema_ref.try_into()?
420                } else {
421                    return Err(Error::General(
422                        "Invalid schema while deserializing nested ScalarValue"
423                            .to_string(),
424                    ));
425                };
426
427                // IPC dictionary batch IDs are assigned when encoding the schema, but our protobuf
428                // `Schema` doesn't preserve those IDs. Reconstruct them deterministically by
429                // round-tripping the schema through IPC.
430                let schema: Schema = {
431                    let ipc_gen = IpcDataGenerator {};
432                    let write_options = IpcWriteOptions::default();
433                    let mut dict_tracker = DictionaryTracker::new(false);
434                    let encoded_schema = ipc_gen.schema_to_bytes_with_dictionary_tracker(
435                        &schema,
436                        &mut dict_tracker,
437                        &write_options,
438                    );
439                    let message =
440                        root_as_message(encoded_schema.ipc_message.as_slice()).map_err(
441                            |e| {
442                                Error::General(format!(
443                                    "Error IPC schema message while deserializing nested ScalarValue: {e}"
444                                ))
445                            },
446                        )?;
447                    let ipc_schema = message.header_as_schema().ok_or_else(|| {
448                        Error::General(
449                            "Unexpected message type deserializing nested ScalarValue schema"
450                                .to_string(),
451                        )
452                    })?;
453                    fb_to_schema(ipc_schema)
454                };
455
456                let message = root_as_message(ipc_message.as_slice()).map_err(|e| {
457                    Error::General(format!(
458                        "Error IPC message while deserializing nested ScalarValue: {e}"
459                    ))
460                })?;
461                let buffer = Buffer::from(arrow_data.as_slice());
462
463                let ipc_batch = message.header_as_record_batch().ok_or_else(|| {
464                    Error::General(
465                        "Unexpected message type deserializing nested ScalarValue"
466                            .to_string(),
467                    )
468                })?;
469
470                let mut dict_by_id: HashMap<i64, ArrayRef> = HashMap::new();
471                for protobuf::scalar_nested_value::Dictionary {
472                    ipc_message,
473                    arrow_data,
474                } in dictionaries
475                {
476                    let message = root_as_message(ipc_message.as_slice()).map_err(|e| {
477                        Error::General(format!(
478                            "Error IPC message while deserializing nested ScalarValue dictionary message: {e}"
479                        ))
480                    })?;
481                    let buffer = Buffer::from(arrow_data.as_slice());
482
483                    let dict_batch = message.header_as_dictionary_batch().ok_or_else(|| {
484                        Error::General(
485                            "Unexpected message type deserializing nested ScalarValue dictionary message"
486                                .to_string(),
487                        )
488                    })?;
489                    read_dictionary(
490                        &buffer,
491                        dict_batch,
492                        &schema,
493                        &mut dict_by_id,
494                        &message.version(),
495                    )
496                    .map_err(|e| arrow_datafusion_err!(e))
497                    .map_err(|e| e.context("Decoding nested ScalarValue dictionary"))?;
498                }
499
500                let record_batch = read_record_batch(
501                    &buffer,
502                    ipc_batch,
503                    Arc::new(schema),
504                    &dict_by_id,
505                    None,
506                    &message.version(),
507                )
508                .map_err(|e| arrow_datafusion_err!(e))
509                .map_err(|e| e.context("Decoding nested ScalarValue value"))?;
510                let arr = record_batch.column(0);
511                match value {
512                    Value::ListValue(_) => {
513                        Self::List(arr.as_list::<i32>().to_owned().into())
514                    }
515                    Value::LargeListValue(_) => {
516                        Self::LargeList(arr.as_list::<i64>().to_owned().into())
517                    }
518                    Value::FixedSizeListValue(_) => {
519                        Self::FixedSizeList(arr.as_fixed_size_list().to_owned().into())
520                    }
521                    Value::StructValue(_) => {
522                        Self::Struct(arr.as_struct().to_owned().into())
523                    }
524                    Value::MapValue(_) => Self::Map(arr.as_map().to_owned().into()),
525                    _ => unreachable!(),
526                }
527            }
528            Value::NullValue(v) => {
529                let null_type: DataType = v.try_into()?;
530                null_type.try_into().map_err(Error::DataFusionError)?
531            }
532            Value::Decimal32Value(val) => {
533                let array = vec_to_array(val.value.clone());
534                Self::Decimal32(Some(i32::from_be_bytes(array)), val.p as u8, val.s as i8)
535            }
536            Value::Decimal64Value(val) => {
537                let array = vec_to_array(val.value.clone());
538                Self::Decimal64(Some(i64::from_be_bytes(array)), val.p as u8, val.s as i8)
539            }
540            Value::Decimal128Value(val) => {
541                let array = vec_to_array(val.value.clone());
542                Self::Decimal128(
543                    Some(i128::from_be_bytes(array)),
544                    val.p as u8,
545                    val.s as i8,
546                )
547            }
548            Value::Decimal256Value(val) => {
549                let array = vec_to_array(val.value.clone());
550                Self::Decimal256(
551                    Some(i256::from_be_bytes(array)),
552                    val.p as u8,
553                    val.s as i8,
554                )
555            }
556            Value::Date64Value(v) => Self::Date64(Some(*v)),
557            Value::Time32Value(v) => {
558                let time_value =
559                    v.value.as_ref().ok_or_else(|| Error::required("value"))?;
560                match time_value {
561                    protobuf::scalar_time32_value::Value::Time32SecondValue(t) => {
562                        Self::Time32Second(Some(*t))
563                    }
564                    protobuf::scalar_time32_value::Value::Time32MillisecondValue(t) => {
565                        Self::Time32Millisecond(Some(*t))
566                    }
567                }
568            }
569            Value::Time64Value(v) => {
570                let time_value =
571                    v.value.as_ref().ok_or_else(|| Error::required("value"))?;
572                match time_value {
573                    protobuf::scalar_time64_value::Value::Time64MicrosecondValue(t) => {
574                        Self::Time64Microsecond(Some(*t))
575                    }
576                    protobuf::scalar_time64_value::Value::Time64NanosecondValue(t) => {
577                        Self::Time64Nanosecond(Some(*t))
578                    }
579                }
580            }
581            Value::IntervalYearmonthValue(v) => Self::IntervalYearMonth(Some(*v)),
582            Value::DurationSecondValue(v) => Self::DurationSecond(Some(*v)),
583            Value::DurationMillisecondValue(v) => Self::DurationMillisecond(Some(*v)),
584            Value::DurationMicrosecondValue(v) => Self::DurationMicrosecond(Some(*v)),
585            Value::DurationNanosecondValue(v) => Self::DurationNanosecond(Some(*v)),
586            Value::TimestampValue(v) => {
587                let timezone = if v.timezone.is_empty() {
588                    None
589                } else {
590                    Some(v.timezone.as_str().into())
591                };
592
593                let ts_value =
594                    v.value.as_ref().ok_or_else(|| Error::required("value"))?;
595
596                match ts_value {
597                    protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(t) => {
598                        Self::TimestampMicrosecond(Some(*t), timezone)
599                    }
600                    protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(t) => {
601                        Self::TimestampNanosecond(Some(*t), timezone)
602                    }
603                    protobuf::scalar_timestamp_value::Value::TimeSecondValue(t) => {
604                        Self::TimestampSecond(Some(*t), timezone)
605                    }
606                    protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(t) => {
607                        Self::TimestampMillisecond(Some(*t), timezone)
608                    }
609                }
610            }
611            Value::DictionaryValue(v) => {
612                let index_type: DataType = v
613                    .index_type
614                    .as_ref()
615                    .ok_or_else(|| Error::required("index_type"))?
616                    .try_into()?;
617
618                let value: Self = v
619                    .value
620                    .as_ref()
621                    .ok_or_else(|| Error::required("value"))?
622                    .as_ref()
623                    .try_into()?;
624
625                Self::Dictionary(Box::new(index_type), Box::new(value))
626            }
627            Value::RunEndEncodedValue(v) => {
628                let run_ends_field: Field = v
629                    .run_ends_field
630                    .as_ref()
631                    .ok_or_else(|| Error::required("run_ends_field"))?
632                    .try_into()?;
633
634                let values_field: Field = v
635                    .values_field
636                    .as_ref()
637                    .ok_or_else(|| Error::required("values_field"))?
638                    .try_into()?;
639
640                let value: Self = v
641                    .value
642                    .as_ref()
643                    .ok_or_else(|| Error::required("value"))?
644                    .as_ref()
645                    .try_into()?;
646
647                Self::RunEndEncoded(
648                    run_ends_field.into(),
649                    values_field.into(),
650                    Box::new(value),
651                )
652            }
653            Value::BinaryValue(v) => Self::Binary(Some(v.clone())),
654            Value::BinaryViewValue(v) => Self::BinaryView(Some(v.clone())),
655            Value::LargeBinaryValue(v) => Self::LargeBinary(Some(v.clone())),
656            Value::IntervalDaytimeValue(v) => Self::IntervalDayTime(Some(
657                IntervalDayTimeType::make_value(v.days, v.milliseconds),
658            )),
659            Value::IntervalMonthDayNano(v) => Self::IntervalMonthDayNano(Some(
660                IntervalMonthDayNanoType::make_value(v.months, v.days, v.nanos),
661            )),
662            Value::UnionValue(val) => {
663                let mode = match val.mode {
664                    0 => UnionMode::Sparse,
665                    1 => UnionMode::Dense,
666                    id => Err(Error::unknown("UnionMode", id))?,
667                };
668                let ids = val
669                    .fields
670                    .iter()
671                    .map(|f| f.field_id as i8)
672                    .collect::<Vec<_>>();
673                let fields = val
674                    .fields
675                    .iter()
676                    .map(|f| f.field.clone())
677                    .collect::<Option<Vec<_>>>();
678                let fields = fields.ok_or_else(|| Error::required("UnionField"))?;
679                let fields = parse_proto_fields_to_fields(&fields)?;
680                let union_fields = UnionFields::try_new(ids, fields).map_err(|e| {
681                    DataFusionError::from(e).context("Deserializing Union ScalarValue")
682                })?;
683                let v_id = val.value_id as i8;
684                let val = match &val.value {
685                    None => None,
686                    Some(val) => {
687                        let val: ScalarValue = val
688                            .as_ref()
689                            .try_into()
690                            .map_err(|_| Error::General("Invalid Scalar".to_string()))?;
691                        Some((v_id, Box::new(val)))
692                    }
693                };
694                Self::Union(val, union_fields, mode)
695            }
696            Value::FixedSizeBinaryValue(v) => {
697                Self::FixedSizeBinary(v.length, Some(v.clone().values))
698            }
699        })
700    }
701}
702
703impl From<protobuf::TimeUnit> for TimeUnit {
704    fn from(time_unit: protobuf::TimeUnit) -> Self {
705        match time_unit {
706            protobuf::TimeUnit::Second => TimeUnit::Second,
707            protobuf::TimeUnit::Millisecond => TimeUnit::Millisecond,
708            protobuf::TimeUnit::Microsecond => TimeUnit::Microsecond,
709            protobuf::TimeUnit::Nanosecond => TimeUnit::Nanosecond,
710        }
711    }
712}
713
714impl From<protobuf::IntervalUnit> for IntervalUnit {
715    fn from(interval_unit: protobuf::IntervalUnit) -> Self {
716        match interval_unit {
717            protobuf::IntervalUnit::YearMonth => IntervalUnit::YearMonth,
718            protobuf::IntervalUnit::DayTime => IntervalUnit::DayTime,
719            protobuf::IntervalUnit::MonthDayNano => IntervalUnit::MonthDayNano,
720        }
721    }
722}
723
724impl From<protobuf::Constraints> for Constraints {
725    fn from(constraints: protobuf::Constraints) -> Self {
726        Constraints::new_unverified(
727            constraints
728                .constraints
729                .into_iter()
730                .map(|item| item.into())
731                .collect(),
732        )
733    }
734}
735
736impl From<protobuf::Constraint> for Constraint {
737    fn from(value: protobuf::Constraint) -> Self {
738        match value.constraint_mode.unwrap() {
739            protobuf::constraint::ConstraintMode::PrimaryKey(elem) => {
740                Constraint::PrimaryKey(
741                    elem.indices.into_iter().map(|item| item as usize).collect(),
742                )
743            }
744            protobuf::constraint::ConstraintMode::Unique(elem) => Constraint::Unique(
745                elem.indices.into_iter().map(|item| item as usize).collect(),
746            ),
747        }
748    }
749}
750
751impl From<&protobuf::ColumnStats> for ColumnStatistics {
752    fn from(cs: &protobuf::ColumnStats) -> ColumnStatistics {
753        ColumnStatistics {
754            null_count: if let Some(nc) = &cs.null_count {
755                nc.clone().into()
756            } else {
757                Precision::Absent
758            },
759            max_value: if let Some(max) = &cs.max_value {
760                max.clone().into()
761            } else {
762                Precision::Absent
763            },
764            min_value: if let Some(min) = &cs.min_value {
765                min.clone().into()
766            } else {
767                Precision::Absent
768            },
769            sum_value: if let Some(sum) = &cs.sum_value {
770                sum.clone().into()
771            } else {
772                Precision::Absent
773            },
774            distinct_count: if let Some(dc) = &cs.distinct_count {
775                dc.clone().into()
776            } else {
777                Precision::Absent
778            },
779            byte_size: if let Some(sbs) = &cs.byte_size {
780                sbs.clone().into()
781            } else {
782                Precision::Absent
783            },
784        }
785    }
786}
787
788impl From<protobuf::Precision> for Precision<usize> {
789    fn from(s: protobuf::Precision) -> Self {
790        let Ok(precision_type) = s.precision_info.try_into() else {
791            return Precision::Absent;
792        };
793        match precision_type {
794            protobuf::PrecisionInfo::Exact => {
795                if let Some(val) = s.val {
796                    if let Ok(ScalarValue::UInt64(Some(val))) =
797                        ScalarValue::try_from(&val)
798                    {
799                        Precision::Exact(val as usize)
800                    } else {
801                        Precision::Absent
802                    }
803                } else {
804                    Precision::Absent
805                }
806            }
807            protobuf::PrecisionInfo::Inexact => {
808                if let Some(val) = s.val {
809                    if let Ok(ScalarValue::UInt64(Some(val))) =
810                        ScalarValue::try_from(&val)
811                    {
812                        Precision::Inexact(val as usize)
813                    } else {
814                        Precision::Absent
815                    }
816                } else {
817                    Precision::Absent
818                }
819            }
820            protobuf::PrecisionInfo::Absent => Precision::Absent,
821        }
822    }
823}
824
825impl From<protobuf::Precision> for Precision<ScalarValue> {
826    fn from(s: protobuf::Precision) -> Self {
827        let Ok(precision_type) = s.precision_info.try_into() else {
828            return Precision::Absent;
829        };
830        match precision_type {
831            protobuf::PrecisionInfo::Exact => {
832                if let Some(val) = s.val {
833                    if let Ok(val) = ScalarValue::try_from(&val) {
834                        Precision::Exact(val)
835                    } else {
836                        Precision::Absent
837                    }
838                } else {
839                    Precision::Absent
840                }
841            }
842            protobuf::PrecisionInfo::Inexact => {
843                if let Some(val) = s.val {
844                    if let Ok(val) = ScalarValue::try_from(&val) {
845                        Precision::Inexact(val)
846                    } else {
847                        Precision::Absent
848                    }
849                } else {
850                    Precision::Absent
851                }
852            }
853            protobuf::PrecisionInfo::Absent => Precision::Absent,
854        }
855    }
856}
857
858impl From<protobuf::JoinSide> for JoinSide {
859    fn from(t: protobuf::JoinSide) -> Self {
860        match t {
861            protobuf::JoinSide::LeftSide => JoinSide::Left,
862            protobuf::JoinSide::RightSide => JoinSide::Right,
863            protobuf::JoinSide::None => JoinSide::None,
864        }
865    }
866}
867
868impl From<&protobuf::Constraint> for Constraint {
869    fn from(value: &protobuf::Constraint) -> Self {
870        match &value.constraint_mode {
871            Some(protobuf::constraint::ConstraintMode::PrimaryKey(elem)) => {
872                Constraint::PrimaryKey(
873                    elem.indices.iter().map(|&item| item as usize).collect(),
874                )
875            }
876            Some(protobuf::constraint::ConstraintMode::Unique(elem)) => {
877                Constraint::Unique(
878                    elem.indices.iter().map(|&item| item as usize).collect(),
879                )
880            }
881            None => panic!("constraint_mode not set"),
882        }
883    }
884}
885
886impl TryFrom<&protobuf::Constraints> for Constraints {
887    type Error = DataFusionError;
888
889    fn try_from(
890        constraints: &protobuf::Constraints,
891    ) -> datafusion_common::Result<Self, Self::Error> {
892        Ok(Constraints::new_unverified(
893            constraints
894                .constraints
895                .iter()
896                .map(|item| item.into())
897                .collect(),
898        ))
899    }
900}
901
902impl TryFrom<&protobuf::Statistics> for Statistics {
903    type Error = DataFusionError;
904
905    fn try_from(
906        s: &protobuf::Statistics,
907    ) -> datafusion_common::Result<Self, Self::Error> {
908        // Keep it sync with Statistics::to_proto
909        Ok(Statistics {
910            num_rows: if let Some(nr) = &s.num_rows {
911                nr.clone().into()
912            } else {
913                Precision::Absent
914            },
915            total_byte_size: if let Some(tbs) = &s.total_byte_size {
916                tbs.clone().into()
917            } else {
918                Precision::Absent
919            },
920            // No column statistic (None) is encoded with empty array
921            column_statistics: s.column_stats.iter().map(|s| s.into()).collect(),
922        })
923    }
924}
925
926impl From<protobuf::CompressionTypeVariant> for CompressionTypeVariant {
927    fn from(value: protobuf::CompressionTypeVariant) -> Self {
928        match value {
929            protobuf::CompressionTypeVariant::Gzip => Self::GZIP,
930            protobuf::CompressionTypeVariant::Bzip2 => Self::BZIP2,
931            protobuf::CompressionTypeVariant::Xz => Self::XZ,
932            protobuf::CompressionTypeVariant::Zstd => Self::ZSTD,
933            protobuf::CompressionTypeVariant::Uncompressed => Self::UNCOMPRESSED,
934        }
935    }
936}
937
938impl From<CompressionTypeVariant> for protobuf::CompressionTypeVariant {
939    fn from(value: CompressionTypeVariant) -> Self {
940        match value {
941            CompressionTypeVariant::GZIP => Self::Gzip,
942            CompressionTypeVariant::BZIP2 => Self::Bzip2,
943            CompressionTypeVariant::XZ => Self::Xz,
944            CompressionTypeVariant::ZSTD => Self::Zstd,
945            CompressionTypeVariant::UNCOMPRESSED => Self::Uncompressed,
946        }
947    }
948}
949
950impl TryFrom<&protobuf::CsvWriterOptions> for CsvWriterOptions {
951    type Error = DataFusionError;
952
953    fn try_from(
954        opts: &protobuf::CsvWriterOptions,
955    ) -> datafusion_common::Result<Self, Self::Error> {
956        let write_options = csv_writer_options_from_proto(opts)?;
957        let compression: CompressionTypeVariant = opts.compression().into();
958        Ok(CsvWriterOptions::new(write_options, compression))
959    }
960}
961
962impl TryFrom<&protobuf::JsonWriterOptions> for JsonWriterOptions {
963    type Error = DataFusionError;
964
965    fn try_from(
966        opts: &protobuf::JsonWriterOptions,
967    ) -> datafusion_common::Result<Self, Self::Error> {
968        let compression: CompressionTypeVariant = opts.compression().into();
969        Ok(JsonWriterOptions::new(compression))
970    }
971}
972
973impl TryFrom<&protobuf::CsvOptions> for CsvOptions {
974    type Error = DataFusionError;
975
976    fn try_from(
977        proto_opts: &protobuf::CsvOptions,
978    ) -> datafusion_common::Result<Self, Self::Error> {
979        Ok(CsvOptions {
980            has_header: proto_opts.has_header.first().map(|h| *h != 0),
981            delimiter: proto_opts.delimiter[0],
982            quote: proto_opts.quote[0],
983            terminator: proto_opts.terminator.first().copied(),
984            escape: proto_opts.escape.first().copied(),
985            double_quote: proto_opts.double_quote.first().map(|h| *h != 0),
986            newlines_in_values: proto_opts.newlines_in_values.first().map(|h| *h != 0),
987            compression: proto_opts.compression().into(),
988            compression_level: proto_opts.compression_level,
989            schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize),
990            date_format: (!proto_opts.date_format.is_empty())
991                .then(|| proto_opts.date_format.clone()),
992            datetime_format: (!proto_opts.datetime_format.is_empty())
993                .then(|| proto_opts.datetime_format.clone()),
994            timestamp_format: (!proto_opts.timestamp_format.is_empty())
995                .then(|| proto_opts.timestamp_format.clone()),
996            timestamp_tz_format: (!proto_opts.timestamp_tz_format.is_empty())
997                .then(|| proto_opts.timestamp_tz_format.clone()),
998            time_format: (!proto_opts.time_format.is_empty())
999                .then(|| proto_opts.time_format.clone()),
1000            null_value: (!proto_opts.null_value.is_empty())
1001                .then(|| proto_opts.null_value.clone()),
1002            null_regex: (!proto_opts.null_regex.is_empty())
1003                .then(|| proto_opts.null_regex.clone()),
1004            comment: proto_opts.comment.first().copied(),
1005            truncated_rows: proto_opts.truncated_rows.first().map(|h| *h != 0),
1006        })
1007    }
1008}
1009
1010impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
1011    type Error = DataFusionError;
1012
1013    fn try_from(
1014        value: &protobuf::ParquetOptions,
1015    ) -> datafusion_common::Result<Self, Self::Error> {
1016        Ok(ParquetOptions {
1017            enable_page_index: value.enable_page_index,
1018            pruning: value.pruning,
1019            skip_metadata: value.skip_metadata,
1020            metadata_size_hint: value
1021                .metadata_size_hint_opt
1022                .map(|opt| match opt {
1023                    protobuf::parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v) => Some(v as usize),
1024                })
1025                .unwrap_or(None),
1026            pushdown_filters: value.pushdown_filters,
1027            reorder_filters: value.reorder_filters,
1028            force_filter_selections: value.force_filter_selections,
1029            data_pagesize_limit: value.data_pagesize_limit as usize,
1030            write_batch_size: value.write_batch_size as usize,
1031            writer_version: value.writer_version.parse().map_err(|e| {
1032                DataFusionError::Internal(format!("Failed to parse writer_version: {e}"))
1033            })?,
1034            compression: value.compression_opt.clone().map(|opt| match opt {
1035                protobuf::parquet_options::CompressionOpt::Compression(v) => Some(v),
1036            }).unwrap_or(None),
1037            dictionary_enabled: value.dictionary_enabled_opt.as_ref().map(|protobuf::parquet_options::DictionaryEnabledOpt::DictionaryEnabled(v)| *v),
1038            // Continuing from where we left off in the TryFrom implementation
1039            dictionary_page_size_limit: value.dictionary_page_size_limit as usize,
1040            statistics_enabled: value
1041                .statistics_enabled_opt.clone()
1042                .map(|opt| match opt {
1043                    protobuf::parquet_options::StatisticsEnabledOpt::StatisticsEnabled(v) => Some(v),
1044                })
1045                .unwrap_or(None),
1046            max_row_group_size: value.max_row_group_size as usize,
1047            created_by: value.created_by.clone(),
1048            column_index_truncate_length: value
1049                .column_index_truncate_length_opt.as_ref()
1050                .map(|opt| match opt {
1051                    protobuf::parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v) => Some(*v as usize),
1052                })
1053                .unwrap_or(None),
1054            statistics_truncate_length: value
1055                .statistics_truncate_length_opt.as_ref()
1056                .map(|opt| match opt {
1057                    protobuf::parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v) => Some(*v as usize),
1058                })
1059                .unwrap_or(None),
1060            data_page_row_count_limit: value.data_page_row_count_limit as usize,
1061            encoding: value
1062                .encoding_opt.clone()
1063                .map(|opt| match opt {
1064                    protobuf::parquet_options::EncodingOpt::Encoding(v) => Some(v),
1065                })
1066                .unwrap_or(None),
1067            bloom_filter_on_read: value.bloom_filter_on_read,
1068            bloom_filter_on_write: value.bloom_filter_on_write,
1069            bloom_filter_fpp: value.clone()
1070                .bloom_filter_fpp_opt
1071                .map(|opt| match opt {
1072                    protobuf::parquet_options::BloomFilterFppOpt::BloomFilterFpp(v) => Some(v),
1073                })
1074                .unwrap_or(None),
1075            bloom_filter_ndv: value.clone()
1076                .bloom_filter_ndv_opt
1077                .map(|opt| match opt {
1078                    protobuf::parquet_options::BloomFilterNdvOpt::BloomFilterNdv(v) => Some(v),
1079                })
1080                .unwrap_or(None),
1081            allow_single_file_parallelism: value.allow_single_file_parallelism,
1082            maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as usize,
1083            maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as usize,
1084            schema_force_view_types: value.schema_force_view_types,
1085            binary_as_string: value.binary_as_string,
1086            coerce_int96: value.coerce_int96_opt.clone().map(|opt| match opt {
1087                protobuf::parquet_options::CoerceInt96Opt::CoerceInt96(v) => Some(v),
1088            }).unwrap_or(None),
1089            skip_arrow_metadata: value.skip_arrow_metadata,
1090            max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt {
1091                protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize),
1092            }).unwrap_or(None),
1093        })
1094    }
1095}
1096
1097impl TryFrom<&protobuf::ParquetColumnOptions> for ParquetColumnOptions {
1098    type Error = DataFusionError;
1099    fn try_from(
1100        value: &protobuf::ParquetColumnOptions,
1101    ) -> datafusion_common::Result<Self, Self::Error> {
1102        Ok(ParquetColumnOptions {
1103            compression: value.compression_opt.clone().map(|opt| match opt {
1104                protobuf::parquet_column_options::CompressionOpt::Compression(v) => Some(v),
1105            }).unwrap_or(None),
1106            dictionary_enabled: value.dictionary_enabled_opt.as_ref().map(|protobuf::parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(v)| *v),
1107            statistics_enabled: value
1108                .statistics_enabled_opt.clone()
1109                .map(|opt| match opt {
1110                    protobuf::parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(v) => Some(v),
1111                })
1112                .unwrap_or(None),
1113            encoding: value
1114                .encoding_opt.clone()
1115                .map(|opt| match opt {
1116                    protobuf::parquet_column_options::EncodingOpt::Encoding(v) => Some(v),
1117                })
1118                .unwrap_or(None),
1119            bloom_filter_enabled: value.bloom_filter_enabled_opt.map(|opt| match opt {
1120                protobuf::parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v) => Some(v),
1121            })
1122                .unwrap_or(None),
1123            bloom_filter_fpp: value
1124                .bloom_filter_fpp_opt
1125                .map(|opt| match opt {
1126                    protobuf::parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v) => Some(v),
1127                })
1128                .unwrap_or(None),
1129            bloom_filter_ndv: value
1130                .bloom_filter_ndv_opt
1131                .map(|opt| match opt {
1132                    protobuf::parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v) => Some(v),
1133                })
1134                .unwrap_or(None),
1135        })
1136    }
1137}
1138
1139impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions {
1140    type Error = DataFusionError;
1141    fn try_from(
1142        value: &protobuf::TableParquetOptions,
1143    ) -> datafusion_common::Result<Self, Self::Error> {
1144        let mut column_specific_options: HashMap<String, ParquetColumnOptions> =
1145            HashMap::new();
1146        for protobuf::ParquetColumnSpecificOptions {
1147            column_name,
1148            options: maybe_options,
1149        } in &value.column_specific_options
1150        {
1151            if let Some(options) = maybe_options {
1152                column_specific_options.insert(column_name.clone(), options.try_into()?);
1153            }
1154        }
1155        Ok(TableParquetOptions {
1156            global: value
1157                .global
1158                .as_ref()
1159                .map(|v| v.try_into())
1160                .unwrap()
1161                .unwrap(),
1162            column_specific_options,
1163            key_value_metadata: Default::default(),
1164            crypto: Default::default(),
1165        })
1166    }
1167}
1168
1169impl TryFrom<&protobuf::JsonOptions> for JsonOptions {
1170    type Error = DataFusionError;
1171
1172    fn try_from(
1173        proto_opts: &protobuf::JsonOptions,
1174    ) -> datafusion_common::Result<Self, Self::Error> {
1175        let compression: protobuf::CompressionTypeVariant = proto_opts.compression();
1176        Ok(JsonOptions {
1177            compression: compression.into(),
1178            compression_level: proto_opts.compression_level,
1179            schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize),
1180            newline_delimited: proto_opts.newline_delimited.unwrap_or(true),
1181        })
1182    }
1183}
1184
1185pub fn parse_i32_to_time_unit(value: &i32) -> datafusion_common::Result<TimeUnit, Error> {
1186    protobuf::TimeUnit::try_from(*value)
1187        .map(|t| t.into())
1188        .map_err(|_| Error::unknown("TimeUnit", *value))
1189}
1190
1191pub fn parse_i32_to_interval_unit(
1192    value: &i32,
1193) -> datafusion_common::Result<IntervalUnit, Error> {
1194    protobuf::IntervalUnit::try_from(*value)
1195        .map(|t| t.into())
1196        .map_err(|_| Error::unknown("IntervalUnit", *value))
1197}
1198
1199// panic here because no better way to convert from Vec to Array
1200fn vec_to_array<T, const N: usize>(v: Vec<T>) -> [T; N] {
1201    v.try_into().unwrap_or_else(|v: Vec<T>| {
1202        panic!("Expected a Vec of length {} but it was {}", N, v.len())
1203    })
1204}
1205
1206/// Converts a vector of `protobuf::Field`s to `Arc<arrow::Field>`s.
1207pub fn parse_proto_fields_to_fields<'a, I>(
1208    fields: I,
1209) -> std::result::Result<Vec<Field>, Error>
1210where
1211    I: IntoIterator<Item = &'a protobuf::Field>,
1212{
1213    fields
1214        .into_iter()
1215        .map(Field::try_from)
1216        .collect::<datafusion_common::Result<_, _>>()
1217}
1218
1219pub(crate) fn csv_writer_options_from_proto(
1220    writer_options: &protobuf::CsvWriterOptions,
1221) -> datafusion_common::Result<WriterBuilder> {
1222    let mut builder = WriterBuilder::new();
1223    if !writer_options.delimiter.is_empty() {
1224        if let Some(delimiter) = writer_options.delimiter.chars().next() {
1225            if delimiter.is_ascii() {
1226                builder = builder.with_delimiter(delimiter as u8);
1227            } else {
1228                return Err(proto_error("CSV Delimiter is not ASCII"));
1229            }
1230        } else {
1231            return Err(proto_error("Error parsing CSV Delimiter"));
1232        }
1233    }
1234    if !writer_options.quote.is_empty() {
1235        if let Some(quote) = writer_options.quote.chars().next() {
1236            if quote.is_ascii() {
1237                builder = builder.with_quote(quote as u8);
1238            } else {
1239                return Err(proto_error("CSV Quote is not ASCII"));
1240            }
1241        } else {
1242            return Err(proto_error("Error parsing CSV Quote"));
1243        }
1244    }
1245    if !writer_options.escape.is_empty() {
1246        if let Some(escape) = writer_options.escape.chars().next() {
1247            if escape.is_ascii() {
1248                builder = builder.with_escape(escape as u8);
1249            } else {
1250                return Err(proto_error("CSV Escape is not ASCII"));
1251            }
1252        } else {
1253            return Err(proto_error("Error parsing CSV Escape"));
1254        }
1255    }
1256    Ok(builder
1257        .with_header(writer_options.has_header)
1258        .with_date_format(writer_options.date_format.clone())
1259        .with_datetime_format(writer_options.datetime_format.clone())
1260        .with_timestamp_format(writer_options.timestamp_format.clone())
1261        .with_time_format(writer_options.time_format.clone())
1262        .with_null(writer_options.null_value.clone())
1263        .with_double_quote(writer_options.double_quote))
1264}