Skip to main content

datafusion_proto_common/to_proto/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::protobuf_common as protobuf;
22use crate::protobuf_common::{
23    EmptyMessage, arrow_type::ArrowTypeEnum, scalar_value::Value,
24};
25use arrow::array::{ArrayRef, RecordBatch};
26use arrow::csv::{QuoteStyle, WriterBuilder};
27use arrow::datatypes::{
28    DataType, Field, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, Schema,
29    SchemaRef, TimeUnit, UnionMode,
30};
31use arrow::ipc::writer::{
32    CompressionContext, DictionaryTracker, IpcDataGenerator, IpcWriteOptions,
33};
34use datafusion_common::parsers::CsvQuoteStyle;
35use datafusion_common::{
36    Column, ColumnStatistics, Constraint, Constraints, DFSchema, DFSchemaRef,
37    DataFusionError, JoinSide, ScalarValue, Statistics,
38    config::{
39        CsvOptions, JsonOptions, ParquetCdcOptions, ParquetColumnOptions, ParquetOptions,
40        TableParquetOptions,
41    },
42    file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
43    parsers::CompressionTypeVariant,
44    plan_datafusion_err,
45    stats::Precision,
46};
47
48#[derive(Debug)]
49pub enum Error {
50    General(String),
51
52    InvalidScalarValue(ScalarValue),
53
54    InvalidScalarType(DataType),
55
56    InvalidTimeUnit(TimeUnit),
57
58    NotImplemented(String),
59}
60
61impl std::error::Error for Error {}
62
63impl std::fmt::Display for Error {
64    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
65        match self {
66            Self::General(desc) => write!(f, "General error: {desc}"),
67            Self::InvalidScalarValue(value) => {
68                write!(f, "{value:?} is invalid as a DataFusion scalar value")
69            }
70            Self::InvalidScalarType(data_type) => {
71                write!(f, "{data_type} is invalid as a DataFusion scalar type")
72            }
73            Self::InvalidTimeUnit(time_unit) => {
74                write!(
75                    f,
76                    "Only TimeUnit::Microsecond and TimeUnit::Nanosecond are valid time units, found: {time_unit:?}"
77                )
78            }
79            Self::NotImplemented(s) => {
80                write!(f, "Not implemented: {s}")
81            }
82        }
83    }
84}
85
86impl From<Error> for DataFusionError {
87    fn from(e: Error) -> Self {
88        plan_datafusion_err!("{}", e)
89    }
90}
91
92impl TryFrom<&Field> for protobuf::Field {
93    type Error = Error;
94
95    fn try_from(field: &Field) -> Result<Self, Self::Error> {
96        let arrow_type = field.data_type().try_into()?;
97        Ok(Self {
98            name: field.name().to_owned(),
99            arrow_type: Some(Box::new(arrow_type)),
100            nullable: field.is_nullable(),
101            children: Vec::new(),
102            metadata: field.metadata().clone(),
103        })
104    }
105}
106
107impl TryFrom<&DataType> for protobuf::ArrowType {
108    type Error = Error;
109
110    fn try_from(val: &DataType) -> Result<Self, Self::Error> {
111        let arrow_type_enum: ArrowTypeEnum = val.try_into()?;
112        Ok(Self {
113            arrow_type_enum: Some(arrow_type_enum),
114        })
115    }
116}
117
118impl TryFrom<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
119    type Error = Error;
120
121    fn try_from(val: &DataType) -> Result<Self, Self::Error> {
122        let res = match val {
123            DataType::Null => Self::None(EmptyMessage {}),
124            DataType::Boolean => Self::Bool(EmptyMessage {}),
125            DataType::Int8 => Self::Int8(EmptyMessage {}),
126            DataType::Int16 => Self::Int16(EmptyMessage {}),
127            DataType::Int32 => Self::Int32(EmptyMessage {}),
128            DataType::Int64 => Self::Int64(EmptyMessage {}),
129            DataType::UInt8 => Self::Uint8(EmptyMessage {}),
130            DataType::UInt16 => Self::Uint16(EmptyMessage {}),
131            DataType::UInt32 => Self::Uint32(EmptyMessage {}),
132            DataType::UInt64 => Self::Uint64(EmptyMessage {}),
133            DataType::Float16 => Self::Float16(EmptyMessage {}),
134            DataType::Float32 => Self::Float32(EmptyMessage {}),
135            DataType::Float64 => Self::Float64(EmptyMessage {}),
136            DataType::Timestamp(time_unit, timezone) => {
137                Self::Timestamp(protobuf::Timestamp {
138                    time_unit: protobuf::TimeUnit::from(time_unit) as i32,
139                    timezone: timezone.as_deref().unwrap_or("").to_string(),
140                })
141            }
142            DataType::Date32 => Self::Date32(EmptyMessage {}),
143            DataType::Date64 => Self::Date64(EmptyMessage {}),
144            DataType::Time32(time_unit) => {
145                Self::Time32(protobuf::TimeUnit::from(time_unit) as i32)
146            }
147            DataType::Time64(time_unit) => {
148                Self::Time64(protobuf::TimeUnit::from(time_unit) as i32)
149            }
150            DataType::Duration(time_unit) => {
151                Self::Duration(protobuf::TimeUnit::from(time_unit) as i32)
152            }
153            DataType::Interval(interval_unit) => {
154                Self::Interval(protobuf::IntervalUnit::from(interval_unit) as i32)
155            }
156            DataType::Binary => Self::Binary(EmptyMessage {}),
157            DataType::BinaryView => Self::BinaryView(EmptyMessage {}),
158            DataType::FixedSizeBinary(size) => Self::FixedSizeBinary(*size),
159            DataType::LargeBinary => Self::LargeBinary(EmptyMessage {}),
160            DataType::Utf8 => Self::Utf8(EmptyMessage {}),
161            DataType::Utf8View => Self::Utf8View(EmptyMessage {}),
162            DataType::LargeUtf8 => Self::LargeUtf8(EmptyMessage {}),
163            DataType::List(item_type) => Self::List(Box::new(protobuf::List {
164                field_type: Some(Box::new(item_type.as_ref().try_into()?)),
165            })),
166            DataType::FixedSizeList(item_type, size) => {
167                Self::FixedSizeList(Box::new(protobuf::FixedSizeList {
168                    field_type: Some(Box::new(item_type.as_ref().try_into()?)),
169                    list_size: *size,
170                }))
171            }
172            DataType::LargeList(item_type) => Self::LargeList(Box::new(protobuf::List {
173                field_type: Some(Box::new(item_type.as_ref().try_into()?)),
174            })),
175            DataType::ListView(item_type) => Self::ListView(Box::new(protobuf::List {
176                field_type: Some(Box::new(item_type.as_ref().try_into()?)),
177            })),
178            DataType::LargeListView(item_type) => {
179                Self::LargeListView(Box::new(protobuf::List {
180                    field_type: Some(Box::new(item_type.as_ref().try_into()?)),
181                }))
182            }
183            DataType::Struct(struct_fields) => Self::Struct(protobuf::Struct {
184                sub_field_types: convert_arc_fields_to_proto_fields(struct_fields)?,
185            }),
186            DataType::Union(fields, union_mode) => {
187                let union_mode = match union_mode {
188                    UnionMode::Sparse => protobuf::UnionMode::Sparse,
189                    UnionMode::Dense => protobuf::UnionMode::Dense,
190                };
191                Self::Union(protobuf::Union {
192                    union_types: convert_arc_fields_to_proto_fields(
193                        fields.iter().map(|(_, item)| item),
194                    )?,
195                    union_mode: union_mode.into(),
196                    type_ids: fields.iter().map(|(x, _)| x as i32).collect(),
197                })
198            }
199            DataType::Dictionary(key_type, value_type) => {
200                Self::Dictionary(Box::new(protobuf::Dictionary {
201                    key: Some(Box::new(key_type.as_ref().try_into()?)),
202                    value: Some(Box::new(value_type.as_ref().try_into()?)),
203                }))
204            }
205            DataType::Decimal32(precision, scale) => {
206                Self::Decimal32(protobuf::Decimal32Type {
207                    precision: *precision as u32,
208                    scale: *scale as i32,
209                })
210            }
211            DataType::Decimal64(precision, scale) => {
212                Self::Decimal64(protobuf::Decimal64Type {
213                    precision: *precision as u32,
214                    scale: *scale as i32,
215                })
216            }
217            DataType::Decimal128(precision, scale) => {
218                Self::Decimal128(protobuf::Decimal128Type {
219                    precision: *precision as u32,
220                    scale: *scale as i32,
221                })
222            }
223            DataType::Decimal256(precision, scale) => {
224                Self::Decimal256(protobuf::Decimal256Type {
225                    precision: *precision as u32,
226                    scale: *scale as i32,
227                })
228            }
229            DataType::Map(field, sorted) => Self::Map(Box::new(protobuf::Map {
230                field_type: Some(Box::new(field.as_ref().try_into()?)),
231                keys_sorted: *sorted,
232            })),
233            DataType::RunEndEncoded(run_ends_field, values_field) => {
234                Self::RunEndEncoded(Box::new(protobuf::RunEndEncoded {
235                    run_ends_field: Some(Box::new(run_ends_field.as_ref().try_into()?)),
236                    values_field: Some(Box::new(values_field.as_ref().try_into()?)),
237                }))
238            }
239        };
240
241        Ok(res)
242    }
243}
244
245impl From<Column> for protobuf::Column {
246    fn from(c: Column) -> Self {
247        Self {
248            relation: c.relation.map(|relation| protobuf::ColumnRelation {
249                relation: relation.to_string(),
250            }),
251            name: c.name,
252        }
253    }
254}
255
256impl From<&Column> for protobuf::Column {
257    fn from(c: &Column) -> Self {
258        c.clone().into()
259    }
260}
261
262impl TryFrom<&Schema> for protobuf::Schema {
263    type Error = Error;
264
265    fn try_from(schema: &Schema) -> Result<Self, Self::Error> {
266        Ok(Self {
267            columns: convert_arc_fields_to_proto_fields(schema.fields())?,
268            metadata: schema.metadata.clone(),
269        })
270    }
271}
272
273impl TryFrom<SchemaRef> for protobuf::Schema {
274    type Error = Error;
275
276    fn try_from(schema: SchemaRef) -> Result<Self, Self::Error> {
277        Ok(Self {
278            columns: convert_arc_fields_to_proto_fields(schema.fields())?,
279            metadata: schema.metadata.clone(),
280        })
281    }
282}
283
284impl TryFrom<&DFSchema> for protobuf::DfSchema {
285    type Error = Error;
286
287    fn try_from(s: &DFSchema) -> Result<Self, Self::Error> {
288        let columns = s
289            .iter()
290            .map(|(qualifier, field)| {
291                Ok(protobuf::DfField {
292                    field: Some(field.as_ref().try_into()?),
293                    qualifier: qualifier.map(|r| protobuf::ColumnRelation {
294                        relation: r.to_string(),
295                    }),
296                })
297            })
298            .collect::<Result<Vec<_>, Error>>()?;
299        Ok(Self {
300            columns,
301            metadata: s.metadata().clone(),
302        })
303    }
304}
305
306impl TryFrom<&DFSchemaRef> for protobuf::DfSchema {
307    type Error = Error;
308
309    fn try_from(s: &DFSchemaRef) -> Result<Self, Self::Error> {
310        s.as_ref().try_into()
311    }
312}
313
314impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
315    type Error = Error;
316
317    fn try_from(val: &ScalarValue) -> Result<Self, Self::Error> {
318        let data_type = val.data_type();
319        match val {
320            ScalarValue::Boolean(val) => {
321                create_proto_scalar(val.as_ref(), &data_type, |s| Value::BoolValue(*s))
322            }
323            ScalarValue::Float16(val) => {
324                create_proto_scalar(val.as_ref(), &data_type, |s| {
325                    Value::Float32Value((*s).into())
326                })
327            }
328            ScalarValue::Float32(val) => {
329                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Float32Value(*s))
330            }
331            ScalarValue::Float64(val) => {
332                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Float64Value(*s))
333            }
334            ScalarValue::Int8(val) => {
335                create_proto_scalar(val.as_ref(), &data_type, |s| {
336                    Value::Int8Value(*s as i32)
337                })
338            }
339            ScalarValue::Int16(val) => {
340                create_proto_scalar(val.as_ref(), &data_type, |s| {
341                    Value::Int16Value(*s as i32)
342                })
343            }
344            ScalarValue::Int32(val) => {
345                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Int32Value(*s))
346            }
347            ScalarValue::Int64(val) => {
348                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Int64Value(*s))
349            }
350            ScalarValue::UInt8(val) => {
351                create_proto_scalar(val.as_ref(), &data_type, |s| {
352                    Value::Uint8Value(*s as u32)
353                })
354            }
355            ScalarValue::UInt16(val) => {
356                create_proto_scalar(val.as_ref(), &data_type, |s| {
357                    Value::Uint16Value(*s as u32)
358                })
359            }
360            ScalarValue::UInt32(val) => {
361                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Uint32Value(*s))
362            }
363            ScalarValue::UInt64(val) => {
364                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Uint64Value(*s))
365            }
366            ScalarValue::Utf8(val) => {
367                create_proto_scalar(val.as_ref(), &data_type, |s| {
368                    Value::Utf8Value(s.to_owned())
369                })
370            }
371            ScalarValue::LargeUtf8(val) => {
372                create_proto_scalar(val.as_ref(), &data_type, |s| {
373                    Value::LargeUtf8Value(s.to_owned())
374                })
375            }
376            ScalarValue::Utf8View(val) => {
377                create_proto_scalar(val.as_ref(), &data_type, |s| {
378                    Value::Utf8ViewValue(s.to_owned())
379                })
380            }
381            ScalarValue::List(arr) => {
382                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
383            }
384            ScalarValue::LargeList(arr) => {
385                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
386            }
387            ScalarValue::FixedSizeList(arr) => {
388                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
389            }
390            ScalarValue::ListView(arr) => {
391                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
392            }
393            ScalarValue::LargeListView(arr) => {
394                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
395            }
396            ScalarValue::Struct(arr) => {
397                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
398            }
399            ScalarValue::Map(arr) => {
400                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
401            }
402            ScalarValue::Date32(val) => {
403                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Date32Value(*s))
404            }
405            ScalarValue::TimestampMicrosecond(val, tz) => {
406                create_proto_scalar(val.as_ref(), &data_type, |s| {
407                    Value::TimestampValue(protobuf::ScalarTimestampValue {
408                        timezone: tz.as_deref().unwrap_or("").to_string(),
409                        value: Some(
410                            protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(
411                                *s,
412                            ),
413                        ),
414                    })
415                })
416            }
417            ScalarValue::TimestampNanosecond(val, tz) => {
418                create_proto_scalar(val.as_ref(), &data_type, |s| {
419                    Value::TimestampValue(protobuf::ScalarTimestampValue {
420                        timezone: tz.as_deref().unwrap_or("").to_string(),
421                        value: Some(
422                            protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(
423                                *s,
424                            ),
425                        ),
426                    })
427                })
428            }
429            ScalarValue::Decimal32(val, p, s) => match *val {
430                Some(v) => {
431                    let array = v.to_be_bytes();
432                    let vec_val: Vec<u8> = array.to_vec();
433                    Ok(protobuf::ScalarValue {
434                        value: Some(Value::Decimal32Value(protobuf::Decimal32 {
435                            value: vec_val,
436                            p: *p as i64,
437                            s: *s as i64,
438                        })),
439                    })
440                }
441                None => Ok(protobuf::ScalarValue {
442                    value: Some(protobuf::scalar_value::Value::NullValue(
443                        (&data_type).try_into()?,
444                    )),
445                }),
446            },
447            ScalarValue::Decimal64(val, p, s) => match *val {
448                Some(v) => {
449                    let array = v.to_be_bytes();
450                    let vec_val: Vec<u8> = array.to_vec();
451                    Ok(protobuf::ScalarValue {
452                        value: Some(Value::Decimal64Value(protobuf::Decimal64 {
453                            value: vec_val,
454                            p: *p as i64,
455                            s: *s as i64,
456                        })),
457                    })
458                }
459                None => Ok(protobuf::ScalarValue {
460                    value: Some(protobuf::scalar_value::Value::NullValue(
461                        (&data_type).try_into()?,
462                    )),
463                }),
464            },
465            ScalarValue::Decimal128(val, p, s) => match *val {
466                Some(v) => {
467                    let array = v.to_be_bytes();
468                    let vec_val: Vec<u8> = array.to_vec();
469                    Ok(protobuf::ScalarValue {
470                        value: Some(Value::Decimal128Value(protobuf::Decimal128 {
471                            value: vec_val,
472                            p: *p as i64,
473                            s: *s as i64,
474                        })),
475                    })
476                }
477                None => Ok(protobuf::ScalarValue {
478                    value: Some(protobuf::scalar_value::Value::NullValue(
479                        (&data_type).try_into()?,
480                    )),
481                }),
482            },
483            ScalarValue::Decimal256(val, p, s) => match *val {
484                Some(v) => {
485                    let array = v.to_be_bytes();
486                    let vec_val: Vec<u8> = array.to_vec();
487                    Ok(protobuf::ScalarValue {
488                        value: Some(Value::Decimal256Value(protobuf::Decimal256 {
489                            value: vec_val,
490                            p: *p as i64,
491                            s: *s as i64,
492                        })),
493                    })
494                }
495                None => Ok(protobuf::ScalarValue {
496                    value: Some(protobuf::scalar_value::Value::NullValue(
497                        (&data_type).try_into()?,
498                    )),
499                }),
500            },
501            ScalarValue::Date64(val) => {
502                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Date64Value(*s))
503            }
504            ScalarValue::TimestampSecond(val, tz) => {
505                create_proto_scalar(val.as_ref(), &data_type, |s| {
506                    Value::TimestampValue(protobuf::ScalarTimestampValue {
507                        timezone: tz.as_deref().unwrap_or("").to_string(),
508                        value: Some(
509                            protobuf::scalar_timestamp_value::Value::TimeSecondValue(*s),
510                        ),
511                    })
512                })
513            }
514            ScalarValue::TimestampMillisecond(val, tz) => {
515                create_proto_scalar(val.as_ref(), &data_type, |s| {
516                    Value::TimestampValue(protobuf::ScalarTimestampValue {
517                        timezone: tz.as_deref().unwrap_or("").to_string(),
518                        value: Some(
519                            protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(
520                                *s,
521                            ),
522                        ),
523                    })
524                })
525            }
526            ScalarValue::IntervalYearMonth(val) => {
527                create_proto_scalar(val.as_ref(), &data_type, |s| {
528                    Value::IntervalYearmonthValue(*s)
529                })
530            }
531            ScalarValue::Null => Ok(protobuf::ScalarValue {
532                value: Some(Value::NullValue((&data_type).try_into()?)),
533            }),
534
535            ScalarValue::Binary(val) => {
536                create_proto_scalar(val.as_ref(), &data_type, |s| {
537                    Value::BinaryValue(s.to_owned())
538                })
539            }
540            ScalarValue::BinaryView(val) => {
541                create_proto_scalar(val.as_ref(), &data_type, |s| {
542                    Value::BinaryViewValue(s.to_owned())
543                })
544            }
545            ScalarValue::LargeBinary(val) => {
546                create_proto_scalar(val.as_ref(), &data_type, |s| {
547                    Value::LargeBinaryValue(s.to_owned())
548                })
549            }
550            ScalarValue::FixedSizeBinary(length, val) => {
551                create_proto_scalar(val.as_ref(), &data_type, |s| {
552                    Value::FixedSizeBinaryValue(protobuf::ScalarFixedSizeBinary {
553                        values: s.to_owned(),
554                        length: *length,
555                    })
556                })
557            }
558
559            ScalarValue::Time32Second(v) => {
560                create_proto_scalar(v.as_ref(), &data_type, |v| {
561                    Value::Time32Value(protobuf::ScalarTime32Value {
562                        value: Some(
563                            protobuf::scalar_time32_value::Value::Time32SecondValue(*v),
564                        ),
565                    })
566                })
567            }
568
569            ScalarValue::Time32Millisecond(v) => {
570                create_proto_scalar(v.as_ref(), &data_type, |v| {
571                    Value::Time32Value(protobuf::ScalarTime32Value {
572                        value: Some(
573                            protobuf::scalar_time32_value::Value::Time32MillisecondValue(
574                                *v,
575                            ),
576                        ),
577                    })
578                })
579            }
580
581            ScalarValue::Time64Microsecond(v) => {
582                create_proto_scalar(v.as_ref(), &data_type, |v| {
583                    Value::Time64Value(protobuf::ScalarTime64Value {
584                        value: Some(
585                            protobuf::scalar_time64_value::Value::Time64MicrosecondValue(
586                                *v,
587                            ),
588                        ),
589                    })
590                })
591            }
592
593            ScalarValue::Time64Nanosecond(v) => {
594                create_proto_scalar(v.as_ref(), &data_type, |v| {
595                    Value::Time64Value(protobuf::ScalarTime64Value {
596                        value: Some(
597                            protobuf::scalar_time64_value::Value::Time64NanosecondValue(
598                                *v,
599                            ),
600                        ),
601                    })
602                })
603            }
604
605            ScalarValue::IntervalDayTime(val) => {
606                let value = if let Some(v) = val {
607                    let (days, milliseconds) = IntervalDayTimeType::to_parts(*v);
608                    Value::IntervalDaytimeValue(protobuf::IntervalDayTimeValue {
609                        days,
610                        milliseconds,
611                    })
612                } else {
613                    Value::NullValue((&data_type).try_into()?)
614                };
615
616                Ok(protobuf::ScalarValue { value: Some(value) })
617            }
618
619            ScalarValue::IntervalMonthDayNano(v) => {
620                let value = if let Some(v) = v {
621                    let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v);
622                    Value::IntervalMonthDayNano(protobuf::IntervalMonthDayNanoValue {
623                        months,
624                        days,
625                        nanos,
626                    })
627                } else {
628                    Value::NullValue((&data_type).try_into()?)
629                };
630
631                Ok(protobuf::ScalarValue { value: Some(value) })
632            }
633
634            ScalarValue::DurationSecond(v) => {
635                let value = match v {
636                    Some(v) => Value::DurationSecondValue(*v),
637                    None => Value::NullValue((&data_type).try_into()?),
638                };
639                Ok(protobuf::ScalarValue { value: Some(value) })
640            }
641            ScalarValue::DurationMillisecond(v) => {
642                let value = match v {
643                    Some(v) => Value::DurationMillisecondValue(*v),
644                    None => Value::NullValue((&data_type).try_into()?),
645                };
646                Ok(protobuf::ScalarValue { value: Some(value) })
647            }
648            ScalarValue::DurationMicrosecond(v) => {
649                let value = match v {
650                    Some(v) => Value::DurationMicrosecondValue(*v),
651                    None => Value::NullValue((&data_type).try_into()?),
652                };
653                Ok(protobuf::ScalarValue { value: Some(value) })
654            }
655            ScalarValue::DurationNanosecond(v) => {
656                let value = match v {
657                    Some(v) => Value::DurationNanosecondValue(*v),
658                    None => Value::NullValue((&data_type).try_into()?),
659                };
660                Ok(protobuf::ScalarValue { value: Some(value) })
661            }
662
663            ScalarValue::Union(val, df_fields, mode) => {
664                let mut fields =
665                    Vec::<protobuf::UnionField>::with_capacity(df_fields.len());
666                for (id, field) in df_fields.iter() {
667                    let field_id = id as i32;
668                    let field = Some(field.as_ref().try_into()?);
669                    let field = protobuf::UnionField { field_id, field };
670                    fields.push(field);
671                }
672                let mode = match mode {
673                    UnionMode::Sparse => 0,
674                    UnionMode::Dense => 1,
675                };
676                let value = match val {
677                    None => None,
678                    Some((_id, v)) => Some(Box::new(v.as_ref().try_into()?)),
679                };
680                let val = protobuf::UnionValue {
681                    value_id: val.as_ref().map(|(id, _v)| *id as i32).unwrap_or(0),
682                    value,
683                    fields,
684                    mode,
685                };
686                let val = Value::UnionValue(Box::new(val));
687                let val = protobuf::ScalarValue { value: Some(val) };
688                Ok(val)
689            }
690
691            ScalarValue::Dictionary(index_type, val) => {
692                let value: protobuf::ScalarValue = val.as_ref().try_into()?;
693                Ok(protobuf::ScalarValue {
694                    value: Some(Value::DictionaryValue(Box::new(
695                        protobuf::ScalarDictionaryValue {
696                            index_type: Some(index_type.as_ref().try_into()?),
697                            value: Some(Box::new(value)),
698                        },
699                    ))),
700                })
701            }
702
703            ScalarValue::RunEndEncoded(run_ends_field, values_field, val) => {
704                Ok(protobuf::ScalarValue {
705                    value: Some(Value::RunEndEncodedValue(Box::new(
706                        protobuf::ScalarRunEndEncodedValue {
707                            run_ends_field: Some(run_ends_field.as_ref().try_into()?),
708                            values_field: Some(values_field.as_ref().try_into()?),
709                            value: Some(Box::new(val.as_ref().try_into()?)),
710                        },
711                    ))),
712                })
713            }
714        }
715    }
716}
717
718impl From<&TimeUnit> for protobuf::TimeUnit {
719    fn from(val: &TimeUnit) -> Self {
720        match val {
721            TimeUnit::Second => protobuf::TimeUnit::Second,
722            TimeUnit::Millisecond => protobuf::TimeUnit::Millisecond,
723            TimeUnit::Microsecond => protobuf::TimeUnit::Microsecond,
724            TimeUnit::Nanosecond => protobuf::TimeUnit::Nanosecond,
725        }
726    }
727}
728
729impl From<&IntervalUnit> for protobuf::IntervalUnit {
730    fn from(interval_unit: &IntervalUnit) -> Self {
731        match interval_unit {
732            IntervalUnit::YearMonth => protobuf::IntervalUnit::YearMonth,
733            IntervalUnit::DayTime => protobuf::IntervalUnit::DayTime,
734            IntervalUnit::MonthDayNano => protobuf::IntervalUnit::MonthDayNano,
735        }
736    }
737}
738
739impl From<Constraints> for protobuf::Constraints {
740    fn from(value: Constraints) -> Self {
741        let constraints = value.into_iter().map(|item| item.into()).collect();
742        protobuf::Constraints { constraints }
743    }
744}
745
746impl From<Constraint> for protobuf::Constraint {
747    fn from(value: Constraint) -> Self {
748        let res = match value {
749            Constraint::PrimaryKey(indices) => {
750                let indices = indices.into_iter().map(|item| item as u64).collect();
751                protobuf::constraint::ConstraintMode::PrimaryKey(
752                    protobuf::PrimaryKeyConstraint { indices },
753                )
754            }
755            Constraint::Unique(indices) => {
756                let indices = indices.into_iter().map(|item| item as u64).collect();
757                protobuf::constraint::ConstraintMode::PrimaryKey(
758                    protobuf::PrimaryKeyConstraint { indices },
759                )
760            }
761        };
762        protobuf::Constraint {
763            constraint_mode: Some(res),
764        }
765    }
766}
767
768impl From<&Precision<usize>> for protobuf::Precision {
769    fn from(s: &Precision<usize>) -> protobuf::Precision {
770        match s {
771            Precision::Exact(val) => protobuf::Precision {
772                precision_info: protobuf::PrecisionInfo::Exact.into(),
773                val: Some(crate::protobuf_common::ScalarValue {
774                    value: Some(Value::Uint64Value(*val as u64)),
775                }),
776            },
777            Precision::Inexact(val) => protobuf::Precision {
778                precision_info: protobuf::PrecisionInfo::Inexact.into(),
779                val: Some(crate::protobuf_common::ScalarValue {
780                    value: Some(Value::Uint64Value(*val as u64)),
781                }),
782            },
783            Precision::Absent => protobuf::Precision {
784                precision_info: protobuf::PrecisionInfo::Absent.into(),
785                val: Some(crate::protobuf_common::ScalarValue { value: None }),
786            },
787        }
788    }
789}
790
791impl From<&Precision<datafusion_common::ScalarValue>> for protobuf::Precision {
792    fn from(s: &Precision<datafusion_common::ScalarValue>) -> protobuf::Precision {
793        match s {
794            Precision::Exact(val) => protobuf::Precision {
795                precision_info: protobuf::PrecisionInfo::Exact.into(),
796                val: val.try_into().ok(),
797            },
798            Precision::Inexact(val) => protobuf::Precision {
799                precision_info: protobuf::PrecisionInfo::Inexact.into(),
800                val: val.try_into().ok(),
801            },
802            Precision::Absent => protobuf::Precision {
803                precision_info: protobuf::PrecisionInfo::Absent.into(),
804                val: Some(crate::protobuf_common::ScalarValue { value: None }),
805            },
806        }
807    }
808}
809
810impl From<&Statistics> for protobuf::Statistics {
811    fn from(s: &Statistics) -> protobuf::Statistics {
812        let column_stats = s.column_statistics.iter().map(|s| s.into()).collect();
813        protobuf::Statistics {
814            num_rows: Some(protobuf::Precision::from(&s.num_rows)),
815            total_byte_size: Some(protobuf::Precision::from(&s.total_byte_size)),
816            column_stats,
817        }
818    }
819}
820
821impl From<&ColumnStatistics> for protobuf::ColumnStats {
822    fn from(s: &ColumnStatistics) -> protobuf::ColumnStats {
823        protobuf::ColumnStats {
824            min_value: Some(protobuf::Precision::from(&s.min_value)),
825            max_value: Some(protobuf::Precision::from(&s.max_value)),
826            sum_value: Some(protobuf::Precision::from(&s.sum_value)),
827            null_count: Some(protobuf::Precision::from(&s.null_count)),
828            distinct_count: Some(protobuf::Precision::from(&s.distinct_count)),
829            byte_size: Some(protobuf::Precision::from(&s.byte_size)),
830        }
831    }
832}
833
834impl From<JoinSide> for protobuf::JoinSide {
835    fn from(t: JoinSide) -> Self {
836        match t {
837            JoinSide::Left => protobuf::JoinSide::LeftSide,
838            JoinSide::Right => protobuf::JoinSide::RightSide,
839            JoinSide::None => protobuf::JoinSide::None,
840        }
841    }
842}
843
844impl From<&CompressionTypeVariant> for protobuf::CompressionTypeVariant {
845    fn from(value: &CompressionTypeVariant) -> Self {
846        match value {
847            CompressionTypeVariant::GZIP => Self::Gzip,
848            CompressionTypeVariant::BZIP2 => Self::Bzip2,
849            CompressionTypeVariant::XZ => Self::Xz,
850            CompressionTypeVariant::ZSTD => Self::Zstd,
851            CompressionTypeVariant::UNCOMPRESSED => Self::Uncompressed,
852        }
853    }
854}
855
856impl From<CsvQuoteStyle> for protobuf::CsvQuoteStyle {
857    fn from(value: CsvQuoteStyle) -> Self {
858        match value {
859            CsvQuoteStyle::Necessary => Self::Necessary,
860            CsvQuoteStyle::Always => Self::Always,
861            CsvQuoteStyle::NonNumeric => Self::NonNumeric,
862            CsvQuoteStyle::Never => Self::Never,
863        }
864    }
865}
866
867impl From<QuoteStyle> for protobuf::CsvQuoteStyle {
868    fn from(value: QuoteStyle) -> Self {
869        match value {
870            QuoteStyle::Necessary => Self::Necessary,
871            QuoteStyle::Always => Self::Always,
872            QuoteStyle::NonNumeric => Self::NonNumeric,
873            QuoteStyle::Never => Self::Never,
874            _ => Self::Necessary,
875        }
876    }
877}
878
879impl TryFrom<&CsvWriterOptions> for protobuf::CsvWriterOptions {
880    type Error = DataFusionError;
881
882    fn try_from(opts: &CsvWriterOptions) -> datafusion_common::Result<Self, Self::Error> {
883        Ok(csv_writer_options_to_proto(
884            &opts.writer_options,
885            &opts.compression,
886        ))
887    }
888}
889
890impl TryFrom<&JsonWriterOptions> for protobuf::JsonWriterOptions {
891    type Error = DataFusionError;
892
893    fn try_from(
894        opts: &JsonWriterOptions,
895    ) -> datafusion_common::Result<Self, Self::Error> {
896        let compression: protobuf::CompressionTypeVariant = opts.compression.into();
897        Ok(protobuf::JsonWriterOptions {
898            compression: compression.into(),
899        })
900    }
901}
902
903impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
904    type Error = DataFusionError;
905
906    fn try_from(value: &ParquetOptions) -> datafusion_common::Result<Self, Self::Error> {
907        Ok(protobuf::ParquetOptions {
908            enable_page_index: value.enable_page_index,
909            pruning: value.pruning,
910            skip_metadata: value.skip_metadata,
911            metadata_size_hint_opt: value.metadata_size_hint.map(|v| protobuf::parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v as u64)),
912            pushdown_filters: value.pushdown_filters,
913            reorder_filters: value.reorder_filters,
914            force_filter_selections: value.force_filter_selections,
915            data_pagesize_limit: value.data_pagesize_limit as u64,
916            write_batch_size: value.write_batch_size as u64,
917            writer_version: value.writer_version.to_string(),
918            compression_opt: value.compression.clone().map(protobuf::parquet_options::CompressionOpt::Compression),
919            dictionary_enabled_opt: value.dictionary_enabled.map(protobuf::parquet_options::DictionaryEnabledOpt::DictionaryEnabled),
920            dictionary_page_size_limit: value.dictionary_page_size_limit as u64,
921            statistics_enabled_opt: value.statistics_enabled.clone().map(protobuf::parquet_options::StatisticsEnabledOpt::StatisticsEnabled),
922            max_row_group_size: value.max_row_group_size as u64,
923            created_by: value.created_by.clone(),
924            column_index_truncate_length_opt: value.column_index_truncate_length.map(|v| protobuf::parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v as u64)),
925            statistics_truncate_length_opt: value.statistics_truncate_length.map(|v| protobuf::parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v as u64)),
926            data_page_row_count_limit: value.data_page_row_count_limit as u64,
927            encoding_opt: value.encoding.clone().map(protobuf::parquet_options::EncodingOpt::Encoding),
928            bloom_filter_on_read: value.bloom_filter_on_read,
929            bloom_filter_on_write: value.bloom_filter_on_write,
930            bloom_filter_fpp_opt: value.bloom_filter_fpp.map(protobuf::parquet_options::BloomFilterFppOpt::BloomFilterFpp),
931            bloom_filter_ndv_opt: value.bloom_filter_ndv.map(protobuf::parquet_options::BloomFilterNdvOpt::BloomFilterNdv),
932            allow_single_file_parallelism: value.allow_single_file_parallelism,
933            maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as u64,
934            maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as u64,
935            schema_force_view_types: value.schema_force_view_types,
936            binary_as_string: value.binary_as_string,
937            skip_arrow_metadata: value.skip_arrow_metadata,
938            coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96),
939            coerce_int96_tz_opt: value.coerce_int96_tz.clone().map(protobuf::parquet_options::CoerceInt96TzOpt::CoerceInt96Tz),
940            max_predicate_cache_size_opt: value.max_predicate_cache_size.map(|v| protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as u64)),
941            content_defined_chunking: Some((&value.content_defined_chunking).into()),
942        })
943    }
944}
945
946impl From<&ParquetCdcOptions> for protobuf::ParquetCdcOptions {
947    fn from(value: &ParquetCdcOptions) -> Self {
948        protobuf::ParquetCdcOptions {
949            enabled: value.enabled,
950            min_chunk_size: value.min_chunk_size as u64,
951            max_chunk_size: value.max_chunk_size as u64,
952            norm_level: value.norm_level,
953        }
954    }
955}
956
957impl TryFrom<&ParquetColumnOptions> for protobuf::ParquetColumnOptions {
958    type Error = DataFusionError;
959
960    fn try_from(
961        value: &ParquetColumnOptions,
962    ) -> datafusion_common::Result<Self, Self::Error> {
963        Ok(protobuf::ParquetColumnOptions {
964            compression_opt: value
965                .compression
966                .clone()
967                .map(protobuf::parquet_column_options::CompressionOpt::Compression),
968            dictionary_enabled_opt: value
969                .dictionary_enabled
970                .map(protobuf::parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled),
971            statistics_enabled_opt: value
972                .statistics_enabled
973                .clone()
974                .map(protobuf::parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled),
975            encoding_opt: value
976                .encoding
977                .clone()
978                .map(protobuf::parquet_column_options::EncodingOpt::Encoding),
979            bloom_filter_enabled_opt: value
980                .bloom_filter_enabled
981                .map(protobuf::parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled),
982            bloom_filter_fpp_opt: value
983                .bloom_filter_fpp
984                .map(protobuf::parquet_column_options::BloomFilterFppOpt::BloomFilterFpp),
985            bloom_filter_ndv_opt: value
986                .bloom_filter_ndv
987                .map(protobuf::parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv),
988        })
989    }
990}
991
992impl TryFrom<&TableParquetOptions> for protobuf::TableParquetOptions {
993    type Error = DataFusionError;
994    fn try_from(
995        value: &TableParquetOptions,
996    ) -> datafusion_common::Result<Self, Self::Error> {
997        let column_specific_options = value
998            .column_specific_options
999            .iter()
1000            .map(|(k, v)| {
1001                Ok(protobuf::ParquetColumnSpecificOptions {
1002                    column_name: k.into(),
1003                    options: Some(v.try_into()?),
1004                })
1005            })
1006            .collect::<datafusion_common::Result<Vec<_>>>()?;
1007        let key_value_metadata = value
1008            .key_value_metadata
1009            .iter()
1010            .filter_map(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone())))
1011            .collect::<HashMap<String, String>>();
1012
1013        let global: protobuf::ParquetOptions = (&value.global).try_into()?;
1014
1015        Ok(protobuf::TableParquetOptions {
1016            global: Some(global),
1017            column_specific_options,
1018            key_value_metadata,
1019        })
1020    }
1021}
1022
1023impl TryFrom<&CsvOptions> for protobuf::CsvOptions {
1024    type Error = DataFusionError; // Define or use an appropriate error type
1025
1026    fn try_from(opts: &CsvOptions) -> datafusion_common::Result<Self, Self::Error> {
1027        let compression: protobuf::CompressionTypeVariant = opts.compression.into();
1028        let quote_style: protobuf::CsvQuoteStyle = opts.quote_style.into();
1029        Ok(protobuf::CsvOptions {
1030            has_header: opts.has_header.map_or_else(Vec::new, |h| vec![h as u8]),
1031            delimiter: vec![opts.delimiter],
1032            quote: vec![opts.quote],
1033            terminator: opts.terminator.map_or_else(Vec::new, |e| vec![e]),
1034            escape: opts.escape.map_or_else(Vec::new, |e| vec![e]),
1035            double_quote: opts.double_quote.map_or_else(Vec::new, |h| vec![h as u8]),
1036            newlines_in_values: opts
1037                .newlines_in_values
1038                .map_or_else(Vec::new, |h| vec![h as u8]),
1039            compression: compression.into(),
1040            schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
1041            date_format: opts.date_format.clone().unwrap_or_default(),
1042            datetime_format: opts.datetime_format.clone().unwrap_or_default(),
1043            timestamp_format: opts.timestamp_format.clone().unwrap_or_default(),
1044            timestamp_tz_format: opts.timestamp_tz_format.clone().unwrap_or_default(),
1045            time_format: opts.time_format.clone().unwrap_or_default(),
1046            null_value: opts.null_value.clone().unwrap_or_default(),
1047            null_regex: opts.null_regex.clone().unwrap_or_default(),
1048            comment: opts.comment.map_or_else(Vec::new, |h| vec![h]),
1049            truncated_rows: opts.truncated_rows.map_or_else(Vec::new, |h| vec![h as u8]),
1050            compression_level: opts.compression_level,
1051            quote_style: quote_style.into(),
1052            ignore_leading_whitespace: opts
1053                .ignore_leading_whitespace
1054                .map_or_else(Vec::new, |h| vec![h as u8]),
1055            ignore_trailing_whitespace: opts
1056                .ignore_trailing_whitespace
1057                .map_or_else(Vec::new, |h| vec![h as u8]),
1058        })
1059    }
1060}
1061
1062impl TryFrom<&JsonOptions> for protobuf::JsonOptions {
1063    type Error = DataFusionError;
1064
1065    fn try_from(opts: &JsonOptions) -> datafusion_common::Result<Self, Self::Error> {
1066        let compression: protobuf::CompressionTypeVariant = opts.compression.into();
1067        Ok(protobuf::JsonOptions {
1068            compression: compression.into(),
1069            schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
1070            compression_level: opts.compression_level,
1071            newline_delimited: Some(opts.newline_delimited),
1072        })
1073    }
1074}
1075
1076/// Creates a scalar protobuf value from an optional value (T), and
1077/// encoding None as the appropriate datatype
1078fn create_proto_scalar<I, T: FnOnce(&I) -> protobuf::scalar_value::Value>(
1079    v: Option<&I>,
1080    null_arrow_type: &DataType,
1081    constructor: T,
1082) -> Result<protobuf::ScalarValue, Error> {
1083    let value = v
1084        .map(constructor)
1085        .unwrap_or(protobuf::scalar_value::Value::NullValue(
1086            null_arrow_type.try_into()?,
1087        ));
1088
1089    Ok(protobuf::ScalarValue { value: Some(value) })
1090}
1091
1092// Nested ScalarValue types (List / FixedSizeList / LargeList / ListView / LargeListView / Struct / Map)
1093// are serialized using Arrow IPC messages as a single column RecordBatch
1094fn encode_scalar_nested_value(
1095    arr: ArrayRef,
1096    val: &ScalarValue,
1097) -> Result<protobuf::ScalarValue, Error> {
1098    let batch = RecordBatch::try_from_iter(vec![("field_name", arr)]).map_err(|e| {
1099        Error::General(format!(
1100            "Error creating temporary batch while encoding nested ScalarValue: {e}"
1101        ))
1102    })?;
1103
1104    let ipc_gen = IpcDataGenerator {};
1105    let mut dict_tracker = DictionaryTracker::new(false);
1106    let write_options = IpcWriteOptions::default();
1107    // The IPC writer requires pre-allocated dictionary IDs (normally assigned when
1108    // serializing the schema). Populate `dict_tracker` by encoding the schema first.
1109    ipc_gen.schema_to_bytes_with_dictionary_tracker(
1110        batch.schema().as_ref(),
1111        &mut dict_tracker,
1112        &write_options,
1113    );
1114    let mut compression_context = CompressionContext::default();
1115    let (encoded_dictionaries, encoded_message) = ipc_gen
1116        .encode(
1117            &batch,
1118            &mut dict_tracker,
1119            &write_options,
1120            &mut compression_context,
1121        )
1122        .map_err(|e| {
1123            Error::General(format!("Error encoding nested ScalarValue as IPC: {e}"))
1124        })?;
1125
1126    let schema: protobuf::Schema = batch.schema().try_into()?;
1127
1128    let scalar_list_value = protobuf::ScalarNestedValue {
1129        ipc_message: encoded_message.ipc_message,
1130        arrow_data: encoded_message.arrow_data,
1131        dictionaries: encoded_dictionaries
1132            .into_iter()
1133            .map(|data| protobuf::scalar_nested_value::Dictionary {
1134                ipc_message: data.ipc_message,
1135                arrow_data: data.arrow_data,
1136            })
1137            .collect(),
1138        schema: Some(schema),
1139    };
1140
1141    match val {
1142        ScalarValue::List(_) => Ok(protobuf::ScalarValue {
1143            value: Some(protobuf::scalar_value::Value::ListValue(scalar_list_value)),
1144        }),
1145        ScalarValue::LargeList(_) => Ok(protobuf::ScalarValue {
1146            value: Some(protobuf::scalar_value::Value::LargeListValue(
1147                scalar_list_value,
1148            )),
1149        }),
1150        ScalarValue::FixedSizeList(_) => Ok(protobuf::ScalarValue {
1151            value: Some(protobuf::scalar_value::Value::FixedSizeListValue(
1152                scalar_list_value,
1153            )),
1154        }),
1155        ScalarValue::ListView(_) => Ok(protobuf::ScalarValue {
1156            value: Some(protobuf::scalar_value::Value::ListViewValue(
1157                scalar_list_value,
1158            )),
1159        }),
1160        ScalarValue::LargeListView(_) => Ok(protobuf::ScalarValue {
1161            value: Some(protobuf::scalar_value::Value::LargeListViewValue(
1162                scalar_list_value,
1163            )),
1164        }),
1165        ScalarValue::Struct(_) => Ok(protobuf::ScalarValue {
1166            value: Some(protobuf::scalar_value::Value::StructValue(
1167                scalar_list_value,
1168            )),
1169        }),
1170        ScalarValue::Map(_) => Ok(protobuf::ScalarValue {
1171            value: Some(protobuf::scalar_value::Value::MapValue(scalar_list_value)),
1172        }),
1173        _ => unreachable!(),
1174    }
1175}
1176
1177/// Converts a vector of `Arc<arrow::Field>`s to `protobuf::Field`s
1178fn convert_arc_fields_to_proto_fields<'a, I>(
1179    fields: I,
1180) -> Result<Vec<protobuf::Field>, Error>
1181where
1182    I: IntoIterator<Item = &'a Arc<Field>>,
1183{
1184    fields
1185        .into_iter()
1186        .map(|field| field.as_ref().try_into())
1187        .collect::<Result<Vec<_>, Error>>()
1188}
1189
1190pub(crate) fn csv_writer_options_to_proto(
1191    csv_options: &WriterBuilder,
1192    compression: &CompressionTypeVariant,
1193) -> protobuf::CsvWriterOptions {
1194    let compression: protobuf::CompressionTypeVariant = compression.into();
1195    let quote_style: protobuf::CsvQuoteStyle = csv_options.quote_style().into();
1196    protobuf::CsvWriterOptions {
1197        compression: compression.into(),
1198        delimiter: (csv_options.delimiter() as char).to_string(),
1199        has_header: csv_options.header(),
1200        date_format: csv_options.date_format().unwrap_or("").to_owned(),
1201        datetime_format: csv_options.datetime_format().unwrap_or("").to_owned(),
1202        timestamp_format: csv_options.timestamp_format().unwrap_or("").to_owned(),
1203        time_format: csv_options.time_format().unwrap_or("").to_owned(),
1204        null_value: csv_options.null().to_owned(),
1205        quote: (csv_options.quote() as char).to_string(),
1206        escape: (csv_options.escape() as char).to_string(),
1207        double_quote: csv_options.double_quote(),
1208        quote_style: quote_style.into(),
1209        ignore_leading_whitespace: csv_options.ignore_leading_whitespace(),
1210        ignore_trailing_whitespace: csv_options.ignore_trailing_whitespace(),
1211    }
1212}