datafusion_proto_common/to_proto/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::protobuf_common as protobuf;
22use crate::protobuf_common::{
23    arrow_type::ArrowTypeEnum, scalar_value::Value, EmptyMessage,
24};
25use arrow::array::{ArrayRef, RecordBatch};
26use arrow::csv::WriterBuilder;
27use arrow::datatypes::{
28    DataType, Field, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, Schema,
29    SchemaRef, TimeUnit, UnionMode,
30};
31use arrow::ipc::writer::{DictionaryTracker, IpcDataGenerator};
32use datafusion_common::{
33    config::{
34        CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
35        TableParquetOptions,
36    },
37    file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
38    parsers::CompressionTypeVariant,
39    plan_datafusion_err,
40    stats::Precision,
41    Column, ColumnStatistics, Constraint, Constraints, DFSchema, DFSchemaRef,
42    DataFusionError, JoinSide, ScalarValue, Statistics,
43};
44
45#[derive(Debug)]
46pub enum Error {
47    General(String),
48
49    InvalidScalarValue(ScalarValue),
50
51    InvalidScalarType(DataType),
52
53    InvalidTimeUnit(TimeUnit),
54
55    NotImplemented(String),
56}
57
58impl std::error::Error for Error {}
59
60impl std::fmt::Display for Error {
61    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
62        match self {
63            Self::General(desc) => write!(f, "General error: {desc}"),
64            Self::InvalidScalarValue(value) => {
65                write!(f, "{value:?} is invalid as a DataFusion scalar value")
66            }
67            Self::InvalidScalarType(data_type) => {
68                write!(f, "{data_type:?} is invalid as a DataFusion scalar type")
69            }
70            Self::InvalidTimeUnit(time_unit) => {
71                write!(
72                    f,
73                    "Only TimeUnit::Microsecond and TimeUnit::Nanosecond are valid time units, found: {time_unit:?}"
74                )
75            }
76            Self::NotImplemented(s) => {
77                write!(f, "Not implemented: {s}")
78            }
79        }
80    }
81}
82
83impl From<Error> for DataFusionError {
84    fn from(e: Error) -> Self {
85        plan_datafusion_err!("{}", e)
86    }
87}
88
89impl TryFrom<&Field> for protobuf::Field {
90    type Error = Error;
91
92    fn try_from(field: &Field) -> Result<Self, Self::Error> {
93        let arrow_type = field.data_type().try_into()?;
94        Ok(Self {
95            name: field.name().to_owned(),
96            arrow_type: Some(Box::new(arrow_type)),
97            nullable: field.is_nullable(),
98            children: Vec::new(),
99            metadata: field.metadata().clone(),
100        })
101    }
102}
103
104impl TryFrom<&DataType> for protobuf::ArrowType {
105    type Error = Error;
106
107    fn try_from(val: &DataType) -> Result<Self, Self::Error> {
108        let arrow_type_enum: ArrowTypeEnum = val.try_into()?;
109        Ok(Self {
110            arrow_type_enum: Some(arrow_type_enum),
111        })
112    }
113}
114
115impl TryFrom<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
116    type Error = Error;
117
118    fn try_from(val: &DataType) -> Result<Self, Self::Error> {
119        let res = match val {
120            DataType::Null => Self::None(EmptyMessage {}),
121            DataType::Boolean => Self::Bool(EmptyMessage {}),
122            DataType::Int8 => Self::Int8(EmptyMessage {}),
123            DataType::Int16 => Self::Int16(EmptyMessage {}),
124            DataType::Int32 => Self::Int32(EmptyMessage {}),
125            DataType::Int64 => Self::Int64(EmptyMessage {}),
126            DataType::UInt8 => Self::Uint8(EmptyMessage {}),
127            DataType::UInt16 => Self::Uint16(EmptyMessage {}),
128            DataType::UInt32 => Self::Uint32(EmptyMessage {}),
129            DataType::UInt64 => Self::Uint64(EmptyMessage {}),
130            DataType::Float16 => Self::Float16(EmptyMessage {}),
131            DataType::Float32 => Self::Float32(EmptyMessage {}),
132            DataType::Float64 => Self::Float64(EmptyMessage {}),
133            DataType::Timestamp(time_unit, timezone) => {
134                Self::Timestamp(protobuf::Timestamp {
135                    time_unit: protobuf::TimeUnit::from(time_unit) as i32,
136                    timezone: timezone.as_deref().unwrap_or("").to_string(),
137                })
138            }
139            DataType::Date32 => Self::Date32(EmptyMessage {}),
140            DataType::Date64 => Self::Date64(EmptyMessage {}),
141            DataType::Time32(time_unit) => {
142                Self::Time32(protobuf::TimeUnit::from(time_unit) as i32)
143            }
144            DataType::Time64(time_unit) => {
145                Self::Time64(protobuf::TimeUnit::from(time_unit) as i32)
146            }
147            DataType::Duration(time_unit) => {
148                Self::Duration(protobuf::TimeUnit::from(time_unit) as i32)
149            }
150            DataType::Interval(interval_unit) => {
151                Self::Interval(protobuf::IntervalUnit::from(interval_unit) as i32)
152            }
153            DataType::Binary => Self::Binary(EmptyMessage {}),
154            DataType::BinaryView => Self::BinaryView(EmptyMessage {}),
155            DataType::FixedSizeBinary(size) => Self::FixedSizeBinary(*size),
156            DataType::LargeBinary => Self::LargeBinary(EmptyMessage {}),
157            DataType::Utf8 => Self::Utf8(EmptyMessage {}),
158            DataType::Utf8View => Self::Utf8View(EmptyMessage {}),
159            DataType::LargeUtf8 => Self::LargeUtf8(EmptyMessage {}),
160            DataType::List(item_type) => Self::List(Box::new(protobuf::List {
161                field_type: Some(Box::new(item_type.as_ref().try_into()?)),
162            })),
163            DataType::FixedSizeList(item_type, size) => {
164                Self::FixedSizeList(Box::new(protobuf::FixedSizeList {
165                    field_type: Some(Box::new(item_type.as_ref().try_into()?)),
166                    list_size: *size,
167                }))
168            }
169            DataType::LargeList(item_type) => Self::LargeList(Box::new(protobuf::List {
170                field_type: Some(Box::new(item_type.as_ref().try_into()?)),
171            })),
172            DataType::Struct(struct_fields) => Self::Struct(protobuf::Struct {
173                sub_field_types: convert_arc_fields_to_proto_fields(struct_fields)?,
174            }),
175            DataType::Union(fields, union_mode) => {
176                let union_mode = match union_mode {
177                    UnionMode::Sparse => protobuf::UnionMode::Sparse,
178                    UnionMode::Dense => protobuf::UnionMode::Dense,
179                };
180                Self::Union(protobuf::Union {
181                    union_types: convert_arc_fields_to_proto_fields(fields.iter().map(|(_, item)|item))?,
182                    union_mode: union_mode.into(),
183                    type_ids: fields.iter().map(|(x, _)| x as i32).collect(),
184                })
185            }
186            DataType::Dictionary(key_type, value_type) => {
187                Self::Dictionary(Box::new(protobuf::Dictionary {
188                    key: Some(Box::new(key_type.as_ref().try_into()?)),
189                    value: Some(Box::new(value_type.as_ref().try_into()?)),
190                }))
191            }
192            DataType::Decimal128(precision, scale) => Self::Decimal(protobuf::Decimal {
193                precision: *precision as u32,
194                scale: *scale as i32,
195            }),
196            DataType::Decimal256(precision, scale) => Self::Decimal256(protobuf::Decimal256Type {
197                precision: *precision as u32,
198                scale: *scale as i32,
199            }),
200            DataType::Map(field, sorted) => {
201                Self::Map(Box::new(
202                    protobuf::Map {
203                        field_type: Some(Box::new(field.as_ref().try_into()?)),
204                        keys_sorted: *sorted,
205                    }
206                ))
207            }
208            DataType::RunEndEncoded(_, _) => {
209                return Err(Error::General(
210                    "Proto serialization error: The RunEndEncoded data type is not yet supported".to_owned()
211                ))
212            }
213            DataType::ListView(_) | DataType::LargeListView(_) => {
214                return Err(Error::General(format!("Proto serialization error: {val} not yet supported")))
215            }
216        };
217
218        Ok(res)
219    }
220}
221
222impl From<Column> for protobuf::Column {
223    fn from(c: Column) -> Self {
224        Self {
225            relation: c.relation.map(|relation| protobuf::ColumnRelation {
226                relation: relation.to_string(),
227            }),
228            name: c.name,
229        }
230    }
231}
232
233impl From<&Column> for protobuf::Column {
234    fn from(c: &Column) -> Self {
235        c.clone().into()
236    }
237}
238
239impl TryFrom<&Schema> for protobuf::Schema {
240    type Error = Error;
241
242    fn try_from(schema: &Schema) -> Result<Self, Self::Error> {
243        Ok(Self {
244            columns: convert_arc_fields_to_proto_fields(schema.fields())?,
245            metadata: schema.metadata.clone(),
246        })
247    }
248}
249
250impl TryFrom<SchemaRef> for protobuf::Schema {
251    type Error = Error;
252
253    fn try_from(schema: SchemaRef) -> Result<Self, Self::Error> {
254        Ok(Self {
255            columns: convert_arc_fields_to_proto_fields(schema.fields())?,
256            metadata: schema.metadata.clone(),
257        })
258    }
259}
260
261impl TryFrom<&DFSchema> for protobuf::DfSchema {
262    type Error = Error;
263
264    fn try_from(s: &DFSchema) -> Result<Self, Self::Error> {
265        let columns = s
266            .iter()
267            .map(|(qualifier, field)| {
268                Ok(protobuf::DfField {
269                    field: Some(field.as_ref().try_into()?),
270                    qualifier: qualifier.map(|r| protobuf::ColumnRelation {
271                        relation: r.to_string(),
272                    }),
273                })
274            })
275            .collect::<Result<Vec<_>, Error>>()?;
276        Ok(Self {
277            columns,
278            metadata: s.metadata().clone(),
279        })
280    }
281}
282
283impl TryFrom<&DFSchemaRef> for protobuf::DfSchema {
284    type Error = Error;
285
286    fn try_from(s: &DFSchemaRef) -> Result<Self, Self::Error> {
287        s.as_ref().try_into()
288    }
289}
290
291impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
292    type Error = Error;
293
294    fn try_from(val: &ScalarValue) -> Result<Self, Self::Error> {
295        let data_type = val.data_type();
296        match val {
297            ScalarValue::Boolean(val) => {
298                create_proto_scalar(val.as_ref(), &data_type, |s| Value::BoolValue(*s))
299            }
300            ScalarValue::Float16(val) => {
301                create_proto_scalar(val.as_ref(), &data_type, |s| {
302                    Value::Float32Value((*s).into())
303                })
304            }
305            ScalarValue::Float32(val) => {
306                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Float32Value(*s))
307            }
308            ScalarValue::Float64(val) => {
309                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Float64Value(*s))
310            }
311            ScalarValue::Int8(val) => {
312                create_proto_scalar(val.as_ref(), &data_type, |s| {
313                    Value::Int8Value(*s as i32)
314                })
315            }
316            ScalarValue::Int16(val) => {
317                create_proto_scalar(val.as_ref(), &data_type, |s| {
318                    Value::Int16Value(*s as i32)
319                })
320            }
321            ScalarValue::Int32(val) => {
322                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Int32Value(*s))
323            }
324            ScalarValue::Int64(val) => {
325                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Int64Value(*s))
326            }
327            ScalarValue::UInt8(val) => {
328                create_proto_scalar(val.as_ref(), &data_type, |s| {
329                    Value::Uint8Value(*s as u32)
330                })
331            }
332            ScalarValue::UInt16(val) => {
333                create_proto_scalar(val.as_ref(), &data_type, |s| {
334                    Value::Uint16Value(*s as u32)
335                })
336            }
337            ScalarValue::UInt32(val) => {
338                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Uint32Value(*s))
339            }
340            ScalarValue::UInt64(val) => {
341                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Uint64Value(*s))
342            }
343            ScalarValue::Utf8(val) => {
344                create_proto_scalar(val.as_ref(), &data_type, |s| {
345                    Value::Utf8Value(s.to_owned())
346                })
347            }
348            ScalarValue::LargeUtf8(val) => {
349                create_proto_scalar(val.as_ref(), &data_type, |s| {
350                    Value::LargeUtf8Value(s.to_owned())
351                })
352            }
353            ScalarValue::Utf8View(val) => {
354                create_proto_scalar(val.as_ref(), &data_type, |s| {
355                    Value::Utf8ViewValue(s.to_owned())
356                })
357            }
358            ScalarValue::List(arr) => {
359                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
360            }
361            ScalarValue::LargeList(arr) => {
362                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
363            }
364            ScalarValue::FixedSizeList(arr) => {
365                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
366            }
367            ScalarValue::Struct(arr) => {
368                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
369            }
370            ScalarValue::Map(arr) => {
371                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
372            }
373            ScalarValue::Date32(val) => {
374                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Date32Value(*s))
375            }
376            ScalarValue::TimestampMicrosecond(val, tz) => {
377                create_proto_scalar(val.as_ref(), &data_type, |s| {
378                    Value::TimestampValue(protobuf::ScalarTimestampValue {
379                        timezone: tz.as_deref().unwrap_or("").to_string(),
380                        value: Some(
381                            protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(
382                                *s,
383                            ),
384                        ),
385                    })
386                })
387            }
388            ScalarValue::TimestampNanosecond(val, tz) => {
389                create_proto_scalar(val.as_ref(), &data_type, |s| {
390                    Value::TimestampValue(protobuf::ScalarTimestampValue {
391                        timezone: tz.as_deref().unwrap_or("").to_string(),
392                        value: Some(
393                            protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(
394                                *s,
395                            ),
396                        ),
397                    })
398                })
399            }
400            ScalarValue::Decimal128(val, p, s) => match *val {
401                Some(v) => {
402                    let array = v.to_be_bytes();
403                    let vec_val: Vec<u8> = array.to_vec();
404                    Ok(protobuf::ScalarValue {
405                        value: Some(Value::Decimal128Value(protobuf::Decimal128 {
406                            value: vec_val,
407                            p: *p as i64,
408                            s: *s as i64,
409                        })),
410                    })
411                }
412                None => Ok(protobuf::ScalarValue {
413                    value: Some(protobuf::scalar_value::Value::NullValue(
414                        (&data_type).try_into()?,
415                    )),
416                }),
417            },
418            ScalarValue::Decimal256(val, p, s) => match *val {
419                Some(v) => {
420                    let array = v.to_be_bytes();
421                    let vec_val: Vec<u8> = array.to_vec();
422                    Ok(protobuf::ScalarValue {
423                        value: Some(Value::Decimal256Value(protobuf::Decimal256 {
424                            value: vec_val,
425                            p: *p as i64,
426                            s: *s as i64,
427                        })),
428                    })
429                }
430                None => Ok(protobuf::ScalarValue {
431                    value: Some(protobuf::scalar_value::Value::NullValue(
432                        (&data_type).try_into()?,
433                    )),
434                }),
435            },
436            ScalarValue::Date64(val) => {
437                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Date64Value(*s))
438            }
439            ScalarValue::TimestampSecond(val, tz) => {
440                create_proto_scalar(val.as_ref(), &data_type, |s| {
441                    Value::TimestampValue(protobuf::ScalarTimestampValue {
442                        timezone: tz.as_deref().unwrap_or("").to_string(),
443                        value: Some(
444                            protobuf::scalar_timestamp_value::Value::TimeSecondValue(*s),
445                        ),
446                    })
447                })
448            }
449            ScalarValue::TimestampMillisecond(val, tz) => {
450                create_proto_scalar(val.as_ref(), &data_type, |s| {
451                    Value::TimestampValue(protobuf::ScalarTimestampValue {
452                        timezone: tz.as_deref().unwrap_or("").to_string(),
453                        value: Some(
454                            protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(
455                                *s,
456                            ),
457                        ),
458                    })
459                })
460            }
461            ScalarValue::IntervalYearMonth(val) => {
462                create_proto_scalar(val.as_ref(), &data_type, |s| {
463                    Value::IntervalYearmonthValue(*s)
464                })
465            }
466            ScalarValue::Null => Ok(protobuf::ScalarValue {
467                value: Some(Value::NullValue((&data_type).try_into()?)),
468            }),
469
470            ScalarValue::Binary(val) => {
471                create_proto_scalar(val.as_ref(), &data_type, |s| {
472                    Value::BinaryValue(s.to_owned())
473                })
474            }
475            ScalarValue::BinaryView(val) => {
476                create_proto_scalar(val.as_ref(), &data_type, |s| {
477                    Value::BinaryViewValue(s.to_owned())
478                })
479            }
480            ScalarValue::LargeBinary(val) => {
481                create_proto_scalar(val.as_ref(), &data_type, |s| {
482                    Value::LargeBinaryValue(s.to_owned())
483                })
484            }
485            ScalarValue::FixedSizeBinary(length, val) => {
486                create_proto_scalar(val.as_ref(), &data_type, |s| {
487                    Value::FixedSizeBinaryValue(protobuf::ScalarFixedSizeBinary {
488                        values: s.to_owned(),
489                        length: *length,
490                    })
491                })
492            }
493
494            ScalarValue::Time32Second(v) => {
495                create_proto_scalar(v.as_ref(), &data_type, |v| {
496                    Value::Time32Value(protobuf::ScalarTime32Value {
497                        value: Some(
498                            protobuf::scalar_time32_value::Value::Time32SecondValue(*v),
499                        ),
500                    })
501                })
502            }
503
504            ScalarValue::Time32Millisecond(v) => {
505                create_proto_scalar(v.as_ref(), &data_type, |v| {
506                    Value::Time32Value(protobuf::ScalarTime32Value {
507                        value: Some(
508                            protobuf::scalar_time32_value::Value::Time32MillisecondValue(
509                                *v,
510                            ),
511                        ),
512                    })
513                })
514            }
515
516            ScalarValue::Time64Microsecond(v) => {
517                create_proto_scalar(v.as_ref(), &data_type, |v| {
518                    Value::Time64Value(protobuf::ScalarTime64Value {
519                        value: Some(
520                            protobuf::scalar_time64_value::Value::Time64MicrosecondValue(
521                                *v,
522                            ),
523                        ),
524                    })
525                })
526            }
527
528            ScalarValue::Time64Nanosecond(v) => {
529                create_proto_scalar(v.as_ref(), &data_type, |v| {
530                    Value::Time64Value(protobuf::ScalarTime64Value {
531                        value: Some(
532                            protobuf::scalar_time64_value::Value::Time64NanosecondValue(
533                                *v,
534                            ),
535                        ),
536                    })
537                })
538            }
539
540            ScalarValue::IntervalDayTime(val) => {
541                let value = if let Some(v) = val {
542                    let (days, milliseconds) = IntervalDayTimeType::to_parts(*v);
543                    Value::IntervalDaytimeValue(protobuf::IntervalDayTimeValue {
544                        days,
545                        milliseconds,
546                    })
547                } else {
548                    Value::NullValue((&data_type).try_into()?)
549                };
550
551                Ok(protobuf::ScalarValue { value: Some(value) })
552            }
553
554            ScalarValue::IntervalMonthDayNano(v) => {
555                let value = if let Some(v) = v {
556                    let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v);
557                    Value::IntervalMonthDayNano(protobuf::IntervalMonthDayNanoValue {
558                        months,
559                        days,
560                        nanos,
561                    })
562                } else {
563                    Value::NullValue((&data_type).try_into()?)
564                };
565
566                Ok(protobuf::ScalarValue { value: Some(value) })
567            }
568
569            ScalarValue::DurationSecond(v) => {
570                let value = match v {
571                    Some(v) => Value::DurationSecondValue(*v),
572                    None => Value::NullValue((&data_type).try_into()?),
573                };
574                Ok(protobuf::ScalarValue { value: Some(value) })
575            }
576            ScalarValue::DurationMillisecond(v) => {
577                let value = match v {
578                    Some(v) => Value::DurationMillisecondValue(*v),
579                    None => Value::NullValue((&data_type).try_into()?),
580                };
581                Ok(protobuf::ScalarValue { value: Some(value) })
582            }
583            ScalarValue::DurationMicrosecond(v) => {
584                let value = match v {
585                    Some(v) => Value::DurationMicrosecondValue(*v),
586                    None => Value::NullValue((&data_type).try_into()?),
587                };
588                Ok(protobuf::ScalarValue { value: Some(value) })
589            }
590            ScalarValue::DurationNanosecond(v) => {
591                let value = match v {
592                    Some(v) => Value::DurationNanosecondValue(*v),
593                    None => Value::NullValue((&data_type).try_into()?),
594                };
595                Ok(protobuf::ScalarValue { value: Some(value) })
596            }
597
598            ScalarValue::Union(val, df_fields, mode) => {
599                let mut fields =
600                    Vec::<protobuf::UnionField>::with_capacity(df_fields.len());
601                for (id, field) in df_fields.iter() {
602                    let field_id = id as i32;
603                    let field = Some(field.as_ref().try_into()?);
604                    let field = protobuf::UnionField { field_id, field };
605                    fields.push(field);
606                }
607                let mode = match mode {
608                    UnionMode::Sparse => 0,
609                    UnionMode::Dense => 1,
610                };
611                let value = match val {
612                    None => None,
613                    Some((_id, v)) => Some(Box::new(v.as_ref().try_into()?)),
614                };
615                let val = protobuf::UnionValue {
616                    value_id: val.as_ref().map(|(id, _v)| *id as i32).unwrap_or(0),
617                    value,
618                    fields,
619                    mode,
620                };
621                let val = Value::UnionValue(Box::new(val));
622                let val = protobuf::ScalarValue { value: Some(val) };
623                Ok(val)
624            }
625
626            ScalarValue::Dictionary(index_type, val) => {
627                let value: protobuf::ScalarValue = val.as_ref().try_into()?;
628                Ok(protobuf::ScalarValue {
629                    value: Some(Value::DictionaryValue(Box::new(
630                        protobuf::ScalarDictionaryValue {
631                            index_type: Some(index_type.as_ref().try_into()?),
632                            value: Some(Box::new(value)),
633                        },
634                    ))),
635                })
636            }
637        }
638    }
639}
640
641impl From<&TimeUnit> for protobuf::TimeUnit {
642    fn from(val: &TimeUnit) -> Self {
643        match val {
644            TimeUnit::Second => protobuf::TimeUnit::Second,
645            TimeUnit::Millisecond => protobuf::TimeUnit::Millisecond,
646            TimeUnit::Microsecond => protobuf::TimeUnit::Microsecond,
647            TimeUnit::Nanosecond => protobuf::TimeUnit::Nanosecond,
648        }
649    }
650}
651
652impl From<&IntervalUnit> for protobuf::IntervalUnit {
653    fn from(interval_unit: &IntervalUnit) -> Self {
654        match interval_unit {
655            IntervalUnit::YearMonth => protobuf::IntervalUnit::YearMonth,
656            IntervalUnit::DayTime => protobuf::IntervalUnit::DayTime,
657            IntervalUnit::MonthDayNano => protobuf::IntervalUnit::MonthDayNano,
658        }
659    }
660}
661
662impl From<Constraints> for protobuf::Constraints {
663    fn from(value: Constraints) -> Self {
664        let constraints = value.into_iter().map(|item| item.into()).collect();
665        protobuf::Constraints { constraints }
666    }
667}
668
669impl From<Constraint> for protobuf::Constraint {
670    fn from(value: Constraint) -> Self {
671        let res = match value {
672            Constraint::PrimaryKey(indices) => {
673                let indices = indices.into_iter().map(|item| item as u64).collect();
674                protobuf::constraint::ConstraintMode::PrimaryKey(
675                    protobuf::PrimaryKeyConstraint { indices },
676                )
677            }
678            Constraint::Unique(indices) => {
679                let indices = indices.into_iter().map(|item| item as u64).collect();
680                protobuf::constraint::ConstraintMode::PrimaryKey(
681                    protobuf::PrimaryKeyConstraint { indices },
682                )
683            }
684        };
685        protobuf::Constraint {
686            constraint_mode: Some(res),
687        }
688    }
689}
690
691impl From<&Precision<usize>> for protobuf::Precision {
692    fn from(s: &Precision<usize>) -> protobuf::Precision {
693        match s {
694            Precision::Exact(val) => protobuf::Precision {
695                precision_info: protobuf::PrecisionInfo::Exact.into(),
696                val: Some(crate::protobuf_common::ScalarValue {
697                    value: Some(Value::Uint64Value(*val as u64)),
698                }),
699            },
700            Precision::Inexact(val) => protobuf::Precision {
701                precision_info: protobuf::PrecisionInfo::Inexact.into(),
702                val: Some(crate::protobuf_common::ScalarValue {
703                    value: Some(Value::Uint64Value(*val as u64)),
704                }),
705            },
706            Precision::Absent => protobuf::Precision {
707                precision_info: protobuf::PrecisionInfo::Absent.into(),
708                val: Some(crate::protobuf_common::ScalarValue { value: None }),
709            },
710        }
711    }
712}
713
714impl From<&Precision<datafusion_common::ScalarValue>> for protobuf::Precision {
715    fn from(s: &Precision<datafusion_common::ScalarValue>) -> protobuf::Precision {
716        match s {
717            Precision::Exact(val) => protobuf::Precision {
718                precision_info: protobuf::PrecisionInfo::Exact.into(),
719                val: val.try_into().ok(),
720            },
721            Precision::Inexact(val) => protobuf::Precision {
722                precision_info: protobuf::PrecisionInfo::Inexact.into(),
723                val: val.try_into().ok(),
724            },
725            Precision::Absent => protobuf::Precision {
726                precision_info: protobuf::PrecisionInfo::Absent.into(),
727                val: Some(crate::protobuf_common::ScalarValue { value: None }),
728            },
729        }
730    }
731}
732
733impl From<&Statistics> for protobuf::Statistics {
734    fn from(s: &Statistics) -> protobuf::Statistics {
735        let column_stats = s.column_statistics.iter().map(|s| s.into()).collect();
736        protobuf::Statistics {
737            num_rows: Some(protobuf::Precision::from(&s.num_rows)),
738            total_byte_size: Some(protobuf::Precision::from(&s.total_byte_size)),
739            column_stats,
740        }
741    }
742}
743
744impl From<&ColumnStatistics> for protobuf::ColumnStats {
745    fn from(s: &ColumnStatistics) -> protobuf::ColumnStats {
746        protobuf::ColumnStats {
747            min_value: Some(protobuf::Precision::from(&s.min_value)),
748            max_value: Some(protobuf::Precision::from(&s.max_value)),
749            sum_value: Some(protobuf::Precision::from(&s.sum_value)),
750            null_count: Some(protobuf::Precision::from(&s.null_count)),
751            distinct_count: Some(protobuf::Precision::from(&s.distinct_count)),
752        }
753    }
754}
755
756impl From<JoinSide> for protobuf::JoinSide {
757    fn from(t: JoinSide) -> Self {
758        match t {
759            JoinSide::Left => protobuf::JoinSide::LeftSide,
760            JoinSide::Right => protobuf::JoinSide::RightSide,
761            JoinSide::None => protobuf::JoinSide::None,
762        }
763    }
764}
765
766impl From<&CompressionTypeVariant> for protobuf::CompressionTypeVariant {
767    fn from(value: &CompressionTypeVariant) -> Self {
768        match value {
769            CompressionTypeVariant::GZIP => Self::Gzip,
770            CompressionTypeVariant::BZIP2 => Self::Bzip2,
771            CompressionTypeVariant::XZ => Self::Xz,
772            CompressionTypeVariant::ZSTD => Self::Zstd,
773            CompressionTypeVariant::UNCOMPRESSED => Self::Uncompressed,
774        }
775    }
776}
777
778impl TryFrom<&CsvWriterOptions> for protobuf::CsvWriterOptions {
779    type Error = DataFusionError;
780
781    fn try_from(opts: &CsvWriterOptions) -> datafusion_common::Result<Self, Self::Error> {
782        Ok(csv_writer_options_to_proto(
783            &opts.writer_options,
784            &opts.compression,
785        ))
786    }
787}
788
789impl TryFrom<&JsonWriterOptions> for protobuf::JsonWriterOptions {
790    type Error = DataFusionError;
791
792    fn try_from(
793        opts: &JsonWriterOptions,
794    ) -> datafusion_common::Result<Self, Self::Error> {
795        let compression: protobuf::CompressionTypeVariant = opts.compression.into();
796        Ok(protobuf::JsonWriterOptions {
797            compression: compression.into(),
798        })
799    }
800}
801
802impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
803    type Error = DataFusionError;
804
805    fn try_from(value: &ParquetOptions) -> datafusion_common::Result<Self, Self::Error> {
806        Ok(protobuf::ParquetOptions {
807            enable_page_index: value.enable_page_index,
808            pruning: value.pruning,
809            skip_metadata: value.skip_metadata,
810            metadata_size_hint_opt: value.metadata_size_hint.map(|v| protobuf::parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v as u64)),
811            pushdown_filters: value.pushdown_filters,
812            reorder_filters: value.reorder_filters,
813            data_pagesize_limit: value.data_pagesize_limit as u64,
814            write_batch_size: value.write_batch_size as u64,
815            writer_version: value.writer_version.clone(),
816            compression_opt: value.compression.clone().map(protobuf::parquet_options::CompressionOpt::Compression),
817            dictionary_enabled_opt: value.dictionary_enabled.map(protobuf::parquet_options::DictionaryEnabledOpt::DictionaryEnabled),
818            dictionary_page_size_limit: value.dictionary_page_size_limit as u64,
819            statistics_enabled_opt: value.statistics_enabled.clone().map(protobuf::parquet_options::StatisticsEnabledOpt::StatisticsEnabled),
820            #[allow(deprecated)]
821            max_statistics_size_opt: value.max_statistics_size.map(|v| protobuf::parquet_options::MaxStatisticsSizeOpt::MaxStatisticsSize(v as u64)),
822            max_row_group_size: value.max_row_group_size as u64,
823            created_by: value.created_by.clone(),
824            column_index_truncate_length_opt: value.column_index_truncate_length.map(|v| protobuf::parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v as u64)),
825            statistics_truncate_length_opt: value.statistics_truncate_length.map(|v| protobuf::parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v as u64)),
826            data_page_row_count_limit: value.data_page_row_count_limit as u64,
827            encoding_opt: value.encoding.clone().map(protobuf::parquet_options::EncodingOpt::Encoding),
828            bloom_filter_on_read: value.bloom_filter_on_read,
829            bloom_filter_on_write: value.bloom_filter_on_write,
830            bloom_filter_fpp_opt: value.bloom_filter_fpp.map(protobuf::parquet_options::BloomFilterFppOpt::BloomFilterFpp),
831            bloom_filter_ndv_opt: value.bloom_filter_ndv.map(protobuf::parquet_options::BloomFilterNdvOpt::BloomFilterNdv),
832            allow_single_file_parallelism: value.allow_single_file_parallelism,
833            maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as u64,
834            maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as u64,
835            schema_force_view_types: value.schema_force_view_types,
836            binary_as_string: value.binary_as_string,
837            skip_arrow_metadata: value.skip_arrow_metadata,
838            coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96),
839        })
840    }
841}
842
843impl TryFrom<&ParquetColumnOptions> for protobuf::ParquetColumnOptions {
844    type Error = DataFusionError;
845
846    fn try_from(
847        value: &ParquetColumnOptions,
848    ) -> datafusion_common::Result<Self, Self::Error> {
849        Ok(protobuf::ParquetColumnOptions {
850            compression_opt: value
851                .compression
852                .clone()
853                .map(protobuf::parquet_column_options::CompressionOpt::Compression),
854            dictionary_enabled_opt: value
855                .dictionary_enabled
856                .map(protobuf::parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled),
857            statistics_enabled_opt: value
858                .statistics_enabled
859                .clone()
860                .map(protobuf::parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled),
861            #[allow(deprecated)]
862            max_statistics_size_opt: value.max_statistics_size.map(|v| {
863                protobuf::parquet_column_options::MaxStatisticsSizeOpt::MaxStatisticsSize(
864                    v as u32,
865                )
866            }),
867            encoding_opt: value
868                .encoding
869                .clone()
870                .map(protobuf::parquet_column_options::EncodingOpt::Encoding),
871            bloom_filter_enabled_opt: value
872                .bloom_filter_enabled
873                .map(protobuf::parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled),
874            bloom_filter_fpp_opt: value
875                .bloom_filter_fpp
876                .map(protobuf::parquet_column_options::BloomFilterFppOpt::BloomFilterFpp),
877            bloom_filter_ndv_opt: value
878                .bloom_filter_ndv
879                .map(protobuf::parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv),
880        })
881    }
882}
883
884impl TryFrom<&TableParquetOptions> for protobuf::TableParquetOptions {
885    type Error = DataFusionError;
886    fn try_from(
887        value: &TableParquetOptions,
888    ) -> datafusion_common::Result<Self, Self::Error> {
889        let column_specific_options = value
890            .column_specific_options
891            .iter()
892            .map(|(k, v)| {
893                Ok(protobuf::ParquetColumnSpecificOptions {
894                    column_name: k.into(),
895                    options: Some(v.try_into()?),
896                })
897            })
898            .collect::<datafusion_common::Result<Vec<_>>>()?;
899        let key_value_metadata = value
900            .key_value_metadata
901            .iter()
902            .filter_map(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone())))
903            .collect::<HashMap<String, String>>();
904        Ok(protobuf::TableParquetOptions {
905            global: Some((&value.global).try_into()?),
906            column_specific_options,
907            key_value_metadata,
908        })
909    }
910}
911
912impl TryFrom<&CsvOptions> for protobuf::CsvOptions {
913    type Error = DataFusionError; // Define or use an appropriate error type
914
915    fn try_from(opts: &CsvOptions) -> datafusion_common::Result<Self, Self::Error> {
916        let compression: protobuf::CompressionTypeVariant = opts.compression.into();
917        Ok(protobuf::CsvOptions {
918            has_header: opts.has_header.map_or_else(Vec::new, |h| vec![h as u8]),
919            delimiter: vec![opts.delimiter],
920            quote: vec![opts.quote],
921            terminator: opts.terminator.map_or_else(Vec::new, |e| vec![e]),
922            escape: opts.escape.map_or_else(Vec::new, |e| vec![e]),
923            double_quote: opts.double_quote.map_or_else(Vec::new, |h| vec![h as u8]),
924            newlines_in_values: opts
925                .newlines_in_values
926                .map_or_else(Vec::new, |h| vec![h as u8]),
927            compression: compression.into(),
928            schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
929            date_format: opts.date_format.clone().unwrap_or_default(),
930            datetime_format: opts.datetime_format.clone().unwrap_or_default(),
931            timestamp_format: opts.timestamp_format.clone().unwrap_or_default(),
932            timestamp_tz_format: opts.timestamp_tz_format.clone().unwrap_or_default(),
933            time_format: opts.time_format.clone().unwrap_or_default(),
934            null_value: opts.null_value.clone().unwrap_or_default(),
935            null_regex: opts.null_regex.clone().unwrap_or_default(),
936            comment: opts.comment.map_or_else(Vec::new, |h| vec![h]),
937        })
938    }
939}
940
941impl TryFrom<&JsonOptions> for protobuf::JsonOptions {
942    type Error = DataFusionError;
943
944    fn try_from(opts: &JsonOptions) -> datafusion_common::Result<Self, Self::Error> {
945        let compression: protobuf::CompressionTypeVariant = opts.compression.into();
946        Ok(protobuf::JsonOptions {
947            compression: compression.into(),
948            schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
949        })
950    }
951}
952
953/// Creates a scalar protobuf value from an optional value (T), and
954/// encoding None as the appropriate datatype
955fn create_proto_scalar<I, T: FnOnce(&I) -> protobuf::scalar_value::Value>(
956    v: Option<&I>,
957    null_arrow_type: &DataType,
958    constructor: T,
959) -> Result<protobuf::ScalarValue, Error> {
960    let value = v
961        .map(constructor)
962        .unwrap_or(protobuf::scalar_value::Value::NullValue(
963            null_arrow_type.try_into()?,
964        ));
965
966    Ok(protobuf::ScalarValue { value: Some(value) })
967}
968
969// ScalarValue::List / FixedSizeList / LargeList / Struct / Map are serialized using
970// Arrow IPC messages as a single column RecordBatch
971fn encode_scalar_nested_value(
972    arr: ArrayRef,
973    val: &ScalarValue,
974) -> Result<protobuf::ScalarValue, Error> {
975    let batch = RecordBatch::try_from_iter(vec![("field_name", arr)]).map_err(|e| {
976        Error::General(format!(
977            "Error creating temporary batch while encoding ScalarValue::List: {e}"
978        ))
979    })?;
980
981    let gen = IpcDataGenerator {};
982    let mut dict_tracker = DictionaryTracker::new(false);
983    let (encoded_dictionaries, encoded_message) = gen
984        .encoded_batch(&batch, &mut dict_tracker, &Default::default())
985        .map_err(|e| {
986            Error::General(format!("Error encoding ScalarValue::List as IPC: {e}"))
987        })?;
988
989    let schema: protobuf::Schema = batch.schema().try_into()?;
990
991    let scalar_list_value = protobuf::ScalarNestedValue {
992        ipc_message: encoded_message.ipc_message,
993        arrow_data: encoded_message.arrow_data,
994        dictionaries: encoded_dictionaries
995            .into_iter()
996            .map(|data| protobuf::scalar_nested_value::Dictionary {
997                ipc_message: data.ipc_message,
998                arrow_data: data.arrow_data,
999            })
1000            .collect(),
1001        schema: Some(schema),
1002    };
1003
1004    match val {
1005        ScalarValue::List(_) => Ok(protobuf::ScalarValue {
1006            value: Some(protobuf::scalar_value::Value::ListValue(scalar_list_value)),
1007        }),
1008        ScalarValue::LargeList(_) => Ok(protobuf::ScalarValue {
1009            value: Some(protobuf::scalar_value::Value::LargeListValue(
1010                scalar_list_value,
1011            )),
1012        }),
1013        ScalarValue::FixedSizeList(_) => Ok(protobuf::ScalarValue {
1014            value: Some(protobuf::scalar_value::Value::FixedSizeListValue(
1015                scalar_list_value,
1016            )),
1017        }),
1018        ScalarValue::Struct(_) => Ok(protobuf::ScalarValue {
1019            value: Some(protobuf::scalar_value::Value::StructValue(
1020                scalar_list_value,
1021            )),
1022        }),
1023        ScalarValue::Map(_) => Ok(protobuf::ScalarValue {
1024            value: Some(protobuf::scalar_value::Value::MapValue(scalar_list_value)),
1025        }),
1026        _ => unreachable!(),
1027    }
1028}
1029
1030/// Converts a vector of `Arc<arrow::Field>`s to `protobuf::Field`s
1031fn convert_arc_fields_to_proto_fields<'a, I>(
1032    fields: I,
1033) -> Result<Vec<protobuf::Field>, Error>
1034where
1035    I: IntoIterator<Item = &'a Arc<Field>>,
1036{
1037    fields
1038        .into_iter()
1039        .map(|field| field.as_ref().try_into())
1040        .collect::<Result<Vec<_>, Error>>()
1041}
1042
1043pub(crate) fn csv_writer_options_to_proto(
1044    csv_options: &WriterBuilder,
1045    compression: &CompressionTypeVariant,
1046) -> protobuf::CsvWriterOptions {
1047    let compression: protobuf::CompressionTypeVariant = compression.into();
1048    protobuf::CsvWriterOptions {
1049        compression: compression.into(),
1050        delimiter: (csv_options.delimiter() as char).to_string(),
1051        has_header: csv_options.header(),
1052        date_format: csv_options.date_format().unwrap_or("").to_owned(),
1053        datetime_format: csv_options.datetime_format().unwrap_or("").to_owned(),
1054        timestamp_format: csv_options.timestamp_format().unwrap_or("").to_owned(),
1055        time_format: csv_options.time_format().unwrap_or("").to_owned(),
1056        null_value: csv_options.null().to_owned(),
1057        quote: (csv_options.quote() as char).to_string(),
1058        escape: (csv_options.escape() as char).to_string(),
1059        double_quote: csv_options.double_quote(),
1060    }
1061}