datafusion_proto_common/to_proto/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::protobuf_common as protobuf;
22use crate::protobuf_common::{
23    arrow_type::ArrowTypeEnum, scalar_value::Value, EmptyMessage,
24};
25use arrow::array::{ArrayRef, RecordBatch};
26use arrow::csv::WriterBuilder;
27use arrow::datatypes::{
28    DataType, Field, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, Schema,
29    SchemaRef, TimeUnit, UnionMode,
30};
31use arrow::ipc::writer::{DictionaryTracker, IpcDataGenerator};
32use datafusion_common::{
33    config::{
34        CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
35        TableParquetOptions,
36    },
37    file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
38    parsers::CompressionTypeVariant,
39    plan_datafusion_err,
40    stats::Precision,
41    Column, ColumnStatistics, Constraint, Constraints, DFSchema, DFSchemaRef,
42    DataFusionError, JoinSide, ScalarValue, Statistics,
43};
44
45#[derive(Debug)]
46pub enum Error {
47    General(String),
48
49    InvalidScalarValue(ScalarValue),
50
51    InvalidScalarType(DataType),
52
53    InvalidTimeUnit(TimeUnit),
54
55    NotImplemented(String),
56}
57
58impl std::error::Error for Error {}
59
60impl std::fmt::Display for Error {
61    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
62        match self {
63            Self::General(desc) => write!(f, "General error: {desc}"),
64            Self::InvalidScalarValue(value) => {
65                write!(f, "{value:?} is invalid as a DataFusion scalar value")
66            }
67            Self::InvalidScalarType(data_type) => {
68                write!(f, "{data_type:?} is invalid as a DataFusion scalar type")
69            }
70            Self::InvalidTimeUnit(time_unit) => {
71                write!(
72                    f,
73                    "Only TimeUnit::Microsecond and TimeUnit::Nanosecond are valid time units, found: {time_unit:?}"
74                )
75            }
76            Self::NotImplemented(s) => {
77                write!(f, "Not implemented: {s}")
78            }
79        }
80    }
81}
82
83impl From<Error> for DataFusionError {
84    fn from(e: Error) -> Self {
85        plan_datafusion_err!("{}", e)
86    }
87}
88
89impl TryFrom<&Field> for protobuf::Field {
90    type Error = Error;
91
92    fn try_from(field: &Field) -> Result<Self, Self::Error> {
93        let arrow_type = field.data_type().try_into()?;
94        Ok(Self {
95            name: field.name().to_owned(),
96            arrow_type: Some(Box::new(arrow_type)),
97            nullable: field.is_nullable(),
98            children: Vec::new(),
99            metadata: field.metadata().clone(),
100            dict_ordered: field.dict_is_ordered().unwrap_or(false),
101        })
102    }
103}
104
105impl TryFrom<&DataType> for protobuf::ArrowType {
106    type Error = Error;
107
108    fn try_from(val: &DataType) -> Result<Self, Self::Error> {
109        let arrow_type_enum: ArrowTypeEnum = val.try_into()?;
110        Ok(Self {
111            arrow_type_enum: Some(arrow_type_enum),
112        })
113    }
114}
115
116impl TryFrom<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
117    type Error = Error;
118
119    fn try_from(val: &DataType) -> Result<Self, Self::Error> {
120        let res = match val {
121            DataType::Null => Self::None(EmptyMessage {}),
122            DataType::Boolean => Self::Bool(EmptyMessage {}),
123            DataType::Int8 => Self::Int8(EmptyMessage {}),
124            DataType::Int16 => Self::Int16(EmptyMessage {}),
125            DataType::Int32 => Self::Int32(EmptyMessage {}),
126            DataType::Int64 => Self::Int64(EmptyMessage {}),
127            DataType::UInt8 => Self::Uint8(EmptyMessage {}),
128            DataType::UInt16 => Self::Uint16(EmptyMessage {}),
129            DataType::UInt32 => Self::Uint32(EmptyMessage {}),
130            DataType::UInt64 => Self::Uint64(EmptyMessage {}),
131            DataType::Float16 => Self::Float16(EmptyMessage {}),
132            DataType::Float32 => Self::Float32(EmptyMessage {}),
133            DataType::Float64 => Self::Float64(EmptyMessage {}),
134            DataType::Timestamp(time_unit, timezone) => {
135                Self::Timestamp(protobuf::Timestamp {
136                    time_unit: protobuf::TimeUnit::from(time_unit) as i32,
137                    timezone: timezone.as_deref().unwrap_or("").to_string(),
138                })
139            }
140            DataType::Date32 => Self::Date32(EmptyMessage {}),
141            DataType::Date64 => Self::Date64(EmptyMessage {}),
142            DataType::Time32(time_unit) => {
143                Self::Time32(protobuf::TimeUnit::from(time_unit) as i32)
144            }
145            DataType::Time64(time_unit) => {
146                Self::Time64(protobuf::TimeUnit::from(time_unit) as i32)
147            }
148            DataType::Duration(time_unit) => {
149                Self::Duration(protobuf::TimeUnit::from(time_unit) as i32)
150            }
151            DataType::Interval(interval_unit) => {
152                Self::Interval(protobuf::IntervalUnit::from(interval_unit) as i32)
153            }
154            DataType::Binary => Self::Binary(EmptyMessage {}),
155            DataType::BinaryView => Self::BinaryView(EmptyMessage {}),
156            DataType::FixedSizeBinary(size) => Self::FixedSizeBinary(*size),
157            DataType::LargeBinary => Self::LargeBinary(EmptyMessage {}),
158            DataType::Utf8 => Self::Utf8(EmptyMessage {}),
159            DataType::Utf8View => Self::Utf8View(EmptyMessage {}),
160            DataType::LargeUtf8 => Self::LargeUtf8(EmptyMessage {}),
161            DataType::List(item_type) => Self::List(Box::new(protobuf::List {
162                field_type: Some(Box::new(item_type.as_ref().try_into()?)),
163            })),
164            DataType::FixedSizeList(item_type, size) => {
165                Self::FixedSizeList(Box::new(protobuf::FixedSizeList {
166                    field_type: Some(Box::new(item_type.as_ref().try_into()?)),
167                    list_size: *size,
168                }))
169            }
170            DataType::LargeList(item_type) => Self::LargeList(Box::new(protobuf::List {
171                field_type: Some(Box::new(item_type.as_ref().try_into()?)),
172            })),
173            DataType::Struct(struct_fields) => Self::Struct(protobuf::Struct {
174                sub_field_types: convert_arc_fields_to_proto_fields(struct_fields)?,
175            }),
176            DataType::Union(fields, union_mode) => {
177                let union_mode = match union_mode {
178                    UnionMode::Sparse => protobuf::UnionMode::Sparse,
179                    UnionMode::Dense => protobuf::UnionMode::Dense,
180                };
181                Self::Union(protobuf::Union {
182                    union_types: convert_arc_fields_to_proto_fields(fields.iter().map(|(_, item)|item))?,
183                    union_mode: union_mode.into(),
184                    type_ids: fields.iter().map(|(x, _)| x as i32).collect(),
185                })
186            }
187            DataType::Dictionary(key_type, value_type) => {
188                Self::Dictionary(Box::new(protobuf::Dictionary {
189                    key: Some(Box::new(key_type.as_ref().try_into()?)),
190                    value: Some(Box::new(value_type.as_ref().try_into()?)),
191                }))
192            }
193            DataType::Decimal128(precision, scale) => Self::Decimal(protobuf::Decimal {
194                precision: *precision as u32,
195                scale: *scale as i32,
196            }),
197            DataType::Decimal256(precision, scale) => Self::Decimal256(protobuf::Decimal256Type {
198                precision: *precision as u32,
199                scale: *scale as i32,
200            }),
201            DataType::Map(field, sorted) => {
202                Self::Map(Box::new(
203                    protobuf::Map {
204                        field_type: Some(Box::new(field.as_ref().try_into()?)),
205                        keys_sorted: *sorted,
206                    }
207                ))
208            }
209            DataType::RunEndEncoded(_, _) => {
210                return Err(Error::General(
211                    "Proto serialization error: The RunEndEncoded data type is not yet supported".to_owned()
212                ))
213            }
214            DataType::ListView(_) | DataType::LargeListView(_) => {
215                return Err(Error::General(format!("Proto serialization error: {val} not yet supported")))
216            }
217        };
218
219        Ok(res)
220    }
221}
222
223impl From<Column> for protobuf::Column {
224    fn from(c: Column) -> Self {
225        Self {
226            relation: c.relation.map(|relation| protobuf::ColumnRelation {
227                relation: relation.to_string(),
228            }),
229            name: c.name,
230        }
231    }
232}
233
234impl From<&Column> for protobuf::Column {
235    fn from(c: &Column) -> Self {
236        c.clone().into()
237    }
238}
239
240impl TryFrom<&Schema> for protobuf::Schema {
241    type Error = Error;
242
243    fn try_from(schema: &Schema) -> Result<Self, Self::Error> {
244        Ok(Self {
245            columns: convert_arc_fields_to_proto_fields(schema.fields())?,
246            metadata: schema.metadata.clone(),
247        })
248    }
249}
250
251impl TryFrom<SchemaRef> for protobuf::Schema {
252    type Error = Error;
253
254    fn try_from(schema: SchemaRef) -> Result<Self, Self::Error> {
255        Ok(Self {
256            columns: convert_arc_fields_to_proto_fields(schema.fields())?,
257            metadata: schema.metadata.clone(),
258        })
259    }
260}
261
262impl TryFrom<&DFSchema> for protobuf::DfSchema {
263    type Error = Error;
264
265    fn try_from(s: &DFSchema) -> Result<Self, Self::Error> {
266        let columns = s
267            .iter()
268            .map(|(qualifier, field)| {
269                Ok(protobuf::DfField {
270                    field: Some(field.as_ref().try_into()?),
271                    qualifier: qualifier.map(|r| protobuf::ColumnRelation {
272                        relation: r.to_string(),
273                    }),
274                })
275            })
276            .collect::<Result<Vec<_>, Error>>()?;
277        Ok(Self {
278            columns,
279            metadata: s.metadata().clone(),
280        })
281    }
282}
283
284impl TryFrom<&DFSchemaRef> for protobuf::DfSchema {
285    type Error = Error;
286
287    fn try_from(s: &DFSchemaRef) -> Result<Self, Self::Error> {
288        s.as_ref().try_into()
289    }
290}
291
292impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
293    type Error = Error;
294
295    fn try_from(val: &ScalarValue) -> Result<Self, Self::Error> {
296        let data_type = val.data_type();
297        match val {
298            ScalarValue::Boolean(val) => {
299                create_proto_scalar(val.as_ref(), &data_type, |s| Value::BoolValue(*s))
300            }
301            ScalarValue::Float16(val) => {
302                create_proto_scalar(val.as_ref(), &data_type, |s| {
303                    Value::Float32Value((*s).into())
304                })
305            }
306            ScalarValue::Float32(val) => {
307                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Float32Value(*s))
308            }
309            ScalarValue::Float64(val) => {
310                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Float64Value(*s))
311            }
312            ScalarValue::Int8(val) => {
313                create_proto_scalar(val.as_ref(), &data_type, |s| {
314                    Value::Int8Value(*s as i32)
315                })
316            }
317            ScalarValue::Int16(val) => {
318                create_proto_scalar(val.as_ref(), &data_type, |s| {
319                    Value::Int16Value(*s as i32)
320                })
321            }
322            ScalarValue::Int32(val) => {
323                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Int32Value(*s))
324            }
325            ScalarValue::Int64(val) => {
326                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Int64Value(*s))
327            }
328            ScalarValue::UInt8(val) => {
329                create_proto_scalar(val.as_ref(), &data_type, |s| {
330                    Value::Uint8Value(*s as u32)
331                })
332            }
333            ScalarValue::UInt16(val) => {
334                create_proto_scalar(val.as_ref(), &data_type, |s| {
335                    Value::Uint16Value(*s as u32)
336                })
337            }
338            ScalarValue::UInt32(val) => {
339                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Uint32Value(*s))
340            }
341            ScalarValue::UInt64(val) => {
342                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Uint64Value(*s))
343            }
344            ScalarValue::Utf8(val) => {
345                create_proto_scalar(val.as_ref(), &data_type, |s| {
346                    Value::Utf8Value(s.to_owned())
347                })
348            }
349            ScalarValue::LargeUtf8(val) => {
350                create_proto_scalar(val.as_ref(), &data_type, |s| {
351                    Value::LargeUtf8Value(s.to_owned())
352                })
353            }
354            ScalarValue::Utf8View(val) => {
355                create_proto_scalar(val.as_ref(), &data_type, |s| {
356                    Value::Utf8ViewValue(s.to_owned())
357                })
358            }
359            ScalarValue::List(arr) => {
360                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
361            }
362            ScalarValue::LargeList(arr) => {
363                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
364            }
365            ScalarValue::FixedSizeList(arr) => {
366                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
367            }
368            ScalarValue::Struct(arr) => {
369                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
370            }
371            ScalarValue::Map(arr) => {
372                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
373            }
374            ScalarValue::Date32(val) => {
375                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Date32Value(*s))
376            }
377            ScalarValue::TimestampMicrosecond(val, tz) => {
378                create_proto_scalar(val.as_ref(), &data_type, |s| {
379                    Value::TimestampValue(protobuf::ScalarTimestampValue {
380                        timezone: tz.as_deref().unwrap_or("").to_string(),
381                        value: Some(
382                            protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(
383                                *s,
384                            ),
385                        ),
386                    })
387                })
388            }
389            ScalarValue::TimestampNanosecond(val, tz) => {
390                create_proto_scalar(val.as_ref(), &data_type, |s| {
391                    Value::TimestampValue(protobuf::ScalarTimestampValue {
392                        timezone: tz.as_deref().unwrap_or("").to_string(),
393                        value: Some(
394                            protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(
395                                *s,
396                            ),
397                        ),
398                    })
399                })
400            }
401            ScalarValue::Decimal128(val, p, s) => match *val {
402                Some(v) => {
403                    let array = v.to_be_bytes();
404                    let vec_val: Vec<u8> = array.to_vec();
405                    Ok(protobuf::ScalarValue {
406                        value: Some(Value::Decimal128Value(protobuf::Decimal128 {
407                            value: vec_val,
408                            p: *p as i64,
409                            s: *s as i64,
410                        })),
411                    })
412                }
413                None => Ok(protobuf::ScalarValue {
414                    value: Some(protobuf::scalar_value::Value::NullValue(
415                        (&data_type).try_into()?,
416                    )),
417                }),
418            },
419            ScalarValue::Decimal256(val, p, s) => match *val {
420                Some(v) => {
421                    let array = v.to_be_bytes();
422                    let vec_val: Vec<u8> = array.to_vec();
423                    Ok(protobuf::ScalarValue {
424                        value: Some(Value::Decimal256Value(protobuf::Decimal256 {
425                            value: vec_val,
426                            p: *p as i64,
427                            s: *s as i64,
428                        })),
429                    })
430                }
431                None => Ok(protobuf::ScalarValue {
432                    value: Some(protobuf::scalar_value::Value::NullValue(
433                        (&data_type).try_into()?,
434                    )),
435                }),
436            },
437            ScalarValue::Date64(val) => {
438                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Date64Value(*s))
439            }
440            ScalarValue::TimestampSecond(val, tz) => {
441                create_proto_scalar(val.as_ref(), &data_type, |s| {
442                    Value::TimestampValue(protobuf::ScalarTimestampValue {
443                        timezone: tz.as_deref().unwrap_or("").to_string(),
444                        value: Some(
445                            protobuf::scalar_timestamp_value::Value::TimeSecondValue(*s),
446                        ),
447                    })
448                })
449            }
450            ScalarValue::TimestampMillisecond(val, tz) => {
451                create_proto_scalar(val.as_ref(), &data_type, |s| {
452                    Value::TimestampValue(protobuf::ScalarTimestampValue {
453                        timezone: tz.as_deref().unwrap_or("").to_string(),
454                        value: Some(
455                            protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(
456                                *s,
457                            ),
458                        ),
459                    })
460                })
461            }
462            ScalarValue::IntervalYearMonth(val) => {
463                create_proto_scalar(val.as_ref(), &data_type, |s| {
464                    Value::IntervalYearmonthValue(*s)
465                })
466            }
467            ScalarValue::Null => Ok(protobuf::ScalarValue {
468                value: Some(Value::NullValue((&data_type).try_into()?)),
469            }),
470
471            ScalarValue::Binary(val) => {
472                create_proto_scalar(val.as_ref(), &data_type, |s| {
473                    Value::BinaryValue(s.to_owned())
474                })
475            }
476            ScalarValue::BinaryView(val) => {
477                create_proto_scalar(val.as_ref(), &data_type, |s| {
478                    Value::BinaryViewValue(s.to_owned())
479                })
480            }
481            ScalarValue::LargeBinary(val) => {
482                create_proto_scalar(val.as_ref(), &data_type, |s| {
483                    Value::LargeBinaryValue(s.to_owned())
484                })
485            }
486            ScalarValue::FixedSizeBinary(length, val) => {
487                create_proto_scalar(val.as_ref(), &data_type, |s| {
488                    Value::FixedSizeBinaryValue(protobuf::ScalarFixedSizeBinary {
489                        values: s.to_owned(),
490                        length: *length,
491                    })
492                })
493            }
494
495            ScalarValue::Time32Second(v) => {
496                create_proto_scalar(v.as_ref(), &data_type, |v| {
497                    Value::Time32Value(protobuf::ScalarTime32Value {
498                        value: Some(
499                            protobuf::scalar_time32_value::Value::Time32SecondValue(*v),
500                        ),
501                    })
502                })
503            }
504
505            ScalarValue::Time32Millisecond(v) => {
506                create_proto_scalar(v.as_ref(), &data_type, |v| {
507                    Value::Time32Value(protobuf::ScalarTime32Value {
508                        value: Some(
509                            protobuf::scalar_time32_value::Value::Time32MillisecondValue(
510                                *v,
511                            ),
512                        ),
513                    })
514                })
515            }
516
517            ScalarValue::Time64Microsecond(v) => {
518                create_proto_scalar(v.as_ref(), &data_type, |v| {
519                    Value::Time64Value(protobuf::ScalarTime64Value {
520                        value: Some(
521                            protobuf::scalar_time64_value::Value::Time64MicrosecondValue(
522                                *v,
523                            ),
524                        ),
525                    })
526                })
527            }
528
529            ScalarValue::Time64Nanosecond(v) => {
530                create_proto_scalar(v.as_ref(), &data_type, |v| {
531                    Value::Time64Value(protobuf::ScalarTime64Value {
532                        value: Some(
533                            protobuf::scalar_time64_value::Value::Time64NanosecondValue(
534                                *v,
535                            ),
536                        ),
537                    })
538                })
539            }
540
541            ScalarValue::IntervalDayTime(val) => {
542                let value = if let Some(v) = val {
543                    let (days, milliseconds) = IntervalDayTimeType::to_parts(*v);
544                    Value::IntervalDaytimeValue(protobuf::IntervalDayTimeValue {
545                        days,
546                        milliseconds,
547                    })
548                } else {
549                    Value::NullValue((&data_type).try_into()?)
550                };
551
552                Ok(protobuf::ScalarValue { value: Some(value) })
553            }
554
555            ScalarValue::IntervalMonthDayNano(v) => {
556                let value = if let Some(v) = v {
557                    let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v);
558                    Value::IntervalMonthDayNano(protobuf::IntervalMonthDayNanoValue {
559                        months,
560                        days,
561                        nanos,
562                    })
563                } else {
564                    Value::NullValue((&data_type).try_into()?)
565                };
566
567                Ok(protobuf::ScalarValue { value: Some(value) })
568            }
569
570            ScalarValue::DurationSecond(v) => {
571                let value = match v {
572                    Some(v) => Value::DurationSecondValue(*v),
573                    None => Value::NullValue((&data_type).try_into()?),
574                };
575                Ok(protobuf::ScalarValue { value: Some(value) })
576            }
577            ScalarValue::DurationMillisecond(v) => {
578                let value = match v {
579                    Some(v) => Value::DurationMillisecondValue(*v),
580                    None => Value::NullValue((&data_type).try_into()?),
581                };
582                Ok(protobuf::ScalarValue { value: Some(value) })
583            }
584            ScalarValue::DurationMicrosecond(v) => {
585                let value = match v {
586                    Some(v) => Value::DurationMicrosecondValue(*v),
587                    None => Value::NullValue((&data_type).try_into()?),
588                };
589                Ok(protobuf::ScalarValue { value: Some(value) })
590            }
591            ScalarValue::DurationNanosecond(v) => {
592                let value = match v {
593                    Some(v) => Value::DurationNanosecondValue(*v),
594                    None => Value::NullValue((&data_type).try_into()?),
595                };
596                Ok(protobuf::ScalarValue { value: Some(value) })
597            }
598
599            ScalarValue::Union(val, df_fields, mode) => {
600                let mut fields =
601                    Vec::<protobuf::UnionField>::with_capacity(df_fields.len());
602                for (id, field) in df_fields.iter() {
603                    let field_id = id as i32;
604                    let field = Some(field.as_ref().try_into()?);
605                    let field = protobuf::UnionField { field_id, field };
606                    fields.push(field);
607                }
608                let mode = match mode {
609                    UnionMode::Sparse => 0,
610                    UnionMode::Dense => 1,
611                };
612                let value = match val {
613                    None => None,
614                    Some((_id, v)) => Some(Box::new(v.as_ref().try_into()?)),
615                };
616                let val = protobuf::UnionValue {
617                    value_id: val.as_ref().map(|(id, _v)| *id as i32).unwrap_or(0),
618                    value,
619                    fields,
620                    mode,
621                };
622                let val = Value::UnionValue(Box::new(val));
623                let val = protobuf::ScalarValue { value: Some(val) };
624                Ok(val)
625            }
626
627            ScalarValue::Dictionary(index_type, val) => {
628                let value: protobuf::ScalarValue = val.as_ref().try_into()?;
629                Ok(protobuf::ScalarValue {
630                    value: Some(Value::DictionaryValue(Box::new(
631                        protobuf::ScalarDictionaryValue {
632                            index_type: Some(index_type.as_ref().try_into()?),
633                            value: Some(Box::new(value)),
634                        },
635                    ))),
636                })
637            }
638        }
639    }
640}
641
642impl From<&TimeUnit> for protobuf::TimeUnit {
643    fn from(val: &TimeUnit) -> Self {
644        match val {
645            TimeUnit::Second => protobuf::TimeUnit::Second,
646            TimeUnit::Millisecond => protobuf::TimeUnit::Millisecond,
647            TimeUnit::Microsecond => protobuf::TimeUnit::Microsecond,
648            TimeUnit::Nanosecond => protobuf::TimeUnit::Nanosecond,
649        }
650    }
651}
652
653impl From<&IntervalUnit> for protobuf::IntervalUnit {
654    fn from(interval_unit: &IntervalUnit) -> Self {
655        match interval_unit {
656            IntervalUnit::YearMonth => protobuf::IntervalUnit::YearMonth,
657            IntervalUnit::DayTime => protobuf::IntervalUnit::DayTime,
658            IntervalUnit::MonthDayNano => protobuf::IntervalUnit::MonthDayNano,
659        }
660    }
661}
662
663impl From<Constraints> for protobuf::Constraints {
664    fn from(value: Constraints) -> Self {
665        let constraints = value.into_iter().map(|item| item.into()).collect();
666        protobuf::Constraints { constraints }
667    }
668}
669
670impl From<Constraint> for protobuf::Constraint {
671    fn from(value: Constraint) -> Self {
672        let res = match value {
673            Constraint::PrimaryKey(indices) => {
674                let indices = indices.into_iter().map(|item| item as u64).collect();
675                protobuf::constraint::ConstraintMode::PrimaryKey(
676                    protobuf::PrimaryKeyConstraint { indices },
677                )
678            }
679            Constraint::Unique(indices) => {
680                let indices = indices.into_iter().map(|item| item as u64).collect();
681                protobuf::constraint::ConstraintMode::PrimaryKey(
682                    protobuf::PrimaryKeyConstraint { indices },
683                )
684            }
685        };
686        protobuf::Constraint {
687            constraint_mode: Some(res),
688        }
689    }
690}
691
692impl From<&Precision<usize>> for protobuf::Precision {
693    fn from(s: &Precision<usize>) -> protobuf::Precision {
694        match s {
695            Precision::Exact(val) => protobuf::Precision {
696                precision_info: protobuf::PrecisionInfo::Exact.into(),
697                val: Some(crate::protobuf_common::ScalarValue {
698                    value: Some(Value::Uint64Value(*val as u64)),
699                }),
700            },
701            Precision::Inexact(val) => protobuf::Precision {
702                precision_info: protobuf::PrecisionInfo::Inexact.into(),
703                val: Some(crate::protobuf_common::ScalarValue {
704                    value: Some(Value::Uint64Value(*val as u64)),
705                }),
706            },
707            Precision::Absent => protobuf::Precision {
708                precision_info: protobuf::PrecisionInfo::Absent.into(),
709                val: Some(crate::protobuf_common::ScalarValue { value: None }),
710            },
711        }
712    }
713}
714
715impl From<&Precision<datafusion_common::ScalarValue>> for protobuf::Precision {
716    fn from(s: &Precision<datafusion_common::ScalarValue>) -> protobuf::Precision {
717        match s {
718            Precision::Exact(val) => protobuf::Precision {
719                precision_info: protobuf::PrecisionInfo::Exact.into(),
720                val: val.try_into().ok(),
721            },
722            Precision::Inexact(val) => protobuf::Precision {
723                precision_info: protobuf::PrecisionInfo::Inexact.into(),
724                val: val.try_into().ok(),
725            },
726            Precision::Absent => protobuf::Precision {
727                precision_info: protobuf::PrecisionInfo::Absent.into(),
728                val: Some(crate::protobuf_common::ScalarValue { value: None }),
729            },
730        }
731    }
732}
733
734impl From<&Statistics> for protobuf::Statistics {
735    fn from(s: &Statistics) -> protobuf::Statistics {
736        let column_stats = s.column_statistics.iter().map(|s| s.into()).collect();
737        protobuf::Statistics {
738            num_rows: Some(protobuf::Precision::from(&s.num_rows)),
739            total_byte_size: Some(protobuf::Precision::from(&s.total_byte_size)),
740            column_stats,
741        }
742    }
743}
744
745impl From<&ColumnStatistics> for protobuf::ColumnStats {
746    fn from(s: &ColumnStatistics) -> protobuf::ColumnStats {
747        protobuf::ColumnStats {
748            min_value: Some(protobuf::Precision::from(&s.min_value)),
749            max_value: Some(protobuf::Precision::from(&s.max_value)),
750            sum_value: Some(protobuf::Precision::from(&s.sum_value)),
751            null_count: Some(protobuf::Precision::from(&s.null_count)),
752            distinct_count: Some(protobuf::Precision::from(&s.distinct_count)),
753        }
754    }
755}
756
757impl From<JoinSide> for protobuf::JoinSide {
758    fn from(t: JoinSide) -> Self {
759        match t {
760            JoinSide::Left => protobuf::JoinSide::LeftSide,
761            JoinSide::Right => protobuf::JoinSide::RightSide,
762            JoinSide::None => protobuf::JoinSide::None,
763        }
764    }
765}
766
767impl From<&CompressionTypeVariant> for protobuf::CompressionTypeVariant {
768    fn from(value: &CompressionTypeVariant) -> Self {
769        match value {
770            CompressionTypeVariant::GZIP => Self::Gzip,
771            CompressionTypeVariant::BZIP2 => Self::Bzip2,
772            CompressionTypeVariant::XZ => Self::Xz,
773            CompressionTypeVariant::ZSTD => Self::Zstd,
774            CompressionTypeVariant::UNCOMPRESSED => Self::Uncompressed,
775        }
776    }
777}
778
779impl TryFrom<&CsvWriterOptions> for protobuf::CsvWriterOptions {
780    type Error = DataFusionError;
781
782    fn try_from(opts: &CsvWriterOptions) -> datafusion_common::Result<Self, Self::Error> {
783        Ok(csv_writer_options_to_proto(
784            &opts.writer_options,
785            &opts.compression,
786        ))
787    }
788}
789
790impl TryFrom<&JsonWriterOptions> for protobuf::JsonWriterOptions {
791    type Error = DataFusionError;
792
793    fn try_from(
794        opts: &JsonWriterOptions,
795    ) -> datafusion_common::Result<Self, Self::Error> {
796        let compression: protobuf::CompressionTypeVariant = opts.compression.into();
797        Ok(protobuf::JsonWriterOptions {
798            compression: compression.into(),
799        })
800    }
801}
802
803impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
804    type Error = DataFusionError;
805
806    fn try_from(value: &ParquetOptions) -> datafusion_common::Result<Self, Self::Error> {
807        Ok(protobuf::ParquetOptions {
808            enable_page_index: value.enable_page_index,
809            pruning: value.pruning,
810            skip_metadata: value.skip_metadata,
811            metadata_size_hint_opt: value.metadata_size_hint.map(|v| protobuf::parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v as u64)),
812            pushdown_filters: value.pushdown_filters,
813            reorder_filters: value.reorder_filters,
814            data_pagesize_limit: value.data_pagesize_limit as u64,
815            write_batch_size: value.write_batch_size as u64,
816            writer_version: value.writer_version.clone(),
817            compression_opt: value.compression.clone().map(protobuf::parquet_options::CompressionOpt::Compression),
818            dictionary_enabled_opt: value.dictionary_enabled.map(protobuf::parquet_options::DictionaryEnabledOpt::DictionaryEnabled),
819            dictionary_page_size_limit: value.dictionary_page_size_limit as u64,
820            statistics_enabled_opt: value.statistics_enabled.clone().map(protobuf::parquet_options::StatisticsEnabledOpt::StatisticsEnabled),
821            #[allow(deprecated)]
822            max_statistics_size_opt: value.max_statistics_size.map(|v| protobuf::parquet_options::MaxStatisticsSizeOpt::MaxStatisticsSize(v as u64)),
823            max_row_group_size: value.max_row_group_size as u64,
824            created_by: value.created_by.clone(),
825            column_index_truncate_length_opt: value.column_index_truncate_length.map(|v| protobuf::parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v as u64)),
826            statistics_truncate_length_opt: value.statistics_truncate_length.map(|v| protobuf::parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v as u64)),
827            data_page_row_count_limit: value.data_page_row_count_limit as u64,
828            encoding_opt: value.encoding.clone().map(protobuf::parquet_options::EncodingOpt::Encoding),
829            bloom_filter_on_read: value.bloom_filter_on_read,
830            bloom_filter_on_write: value.bloom_filter_on_write,
831            bloom_filter_fpp_opt: value.bloom_filter_fpp.map(protobuf::parquet_options::BloomFilterFppOpt::BloomFilterFpp),
832            bloom_filter_ndv_opt: value.bloom_filter_ndv.map(protobuf::parquet_options::BloomFilterNdvOpt::BloomFilterNdv),
833            allow_single_file_parallelism: value.allow_single_file_parallelism,
834            maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as u64,
835            maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as u64,
836            schema_force_view_types: value.schema_force_view_types,
837            binary_as_string: value.binary_as_string,
838            skip_arrow_metadata: value.skip_arrow_metadata,
839            coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96),
840        })
841    }
842}
843
844impl TryFrom<&ParquetColumnOptions> for protobuf::ParquetColumnOptions {
845    type Error = DataFusionError;
846
847    fn try_from(
848        value: &ParquetColumnOptions,
849    ) -> datafusion_common::Result<Self, Self::Error> {
850        Ok(protobuf::ParquetColumnOptions {
851            compression_opt: value
852                .compression
853                .clone()
854                .map(protobuf::parquet_column_options::CompressionOpt::Compression),
855            dictionary_enabled_opt: value
856                .dictionary_enabled
857                .map(protobuf::parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled),
858            statistics_enabled_opt: value
859                .statistics_enabled
860                .clone()
861                .map(protobuf::parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled),
862            #[allow(deprecated)]
863            max_statistics_size_opt: value.max_statistics_size.map(|v| {
864                protobuf::parquet_column_options::MaxStatisticsSizeOpt::MaxStatisticsSize(
865                    v as u32,
866                )
867            }),
868            encoding_opt: value
869                .encoding
870                .clone()
871                .map(protobuf::parquet_column_options::EncodingOpt::Encoding),
872            bloom_filter_enabled_opt: value
873                .bloom_filter_enabled
874                .map(protobuf::parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled),
875            bloom_filter_fpp_opt: value
876                .bloom_filter_fpp
877                .map(protobuf::parquet_column_options::BloomFilterFppOpt::BloomFilterFpp),
878            bloom_filter_ndv_opt: value
879                .bloom_filter_ndv
880                .map(protobuf::parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv),
881        })
882    }
883}
884
885impl TryFrom<&TableParquetOptions> for protobuf::TableParquetOptions {
886    type Error = DataFusionError;
887    fn try_from(
888        value: &TableParquetOptions,
889    ) -> datafusion_common::Result<Self, Self::Error> {
890        let column_specific_options = value
891            .column_specific_options
892            .iter()
893            .map(|(k, v)| {
894                Ok(protobuf::ParquetColumnSpecificOptions {
895                    column_name: k.into(),
896                    options: Some(v.try_into()?),
897                })
898            })
899            .collect::<datafusion_common::Result<Vec<_>>>()?;
900        let key_value_metadata = value
901            .key_value_metadata
902            .iter()
903            .filter_map(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone())))
904            .collect::<HashMap<String, String>>();
905        Ok(protobuf::TableParquetOptions {
906            global: Some((&value.global).try_into()?),
907            column_specific_options,
908            key_value_metadata,
909        })
910    }
911}
912
913impl TryFrom<&CsvOptions> for protobuf::CsvOptions {
914    type Error = DataFusionError; // Define or use an appropriate error type
915
916    fn try_from(opts: &CsvOptions) -> datafusion_common::Result<Self, Self::Error> {
917        let compression: protobuf::CompressionTypeVariant = opts.compression.into();
918        Ok(protobuf::CsvOptions {
919            has_header: opts.has_header.map_or_else(Vec::new, |h| vec![h as u8]),
920            delimiter: vec![opts.delimiter],
921            quote: vec![opts.quote],
922            terminator: opts.terminator.map_or_else(Vec::new, |e| vec![e]),
923            escape: opts.escape.map_or_else(Vec::new, |e| vec![e]),
924            double_quote: opts.double_quote.map_or_else(Vec::new, |h| vec![h as u8]),
925            newlines_in_values: opts
926                .newlines_in_values
927                .map_or_else(Vec::new, |h| vec![h as u8]),
928            compression: compression.into(),
929            schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
930            date_format: opts.date_format.clone().unwrap_or_default(),
931            datetime_format: opts.datetime_format.clone().unwrap_or_default(),
932            timestamp_format: opts.timestamp_format.clone().unwrap_or_default(),
933            timestamp_tz_format: opts.timestamp_tz_format.clone().unwrap_or_default(),
934            time_format: opts.time_format.clone().unwrap_or_default(),
935            null_value: opts.null_value.clone().unwrap_or_default(),
936            null_regex: opts.null_regex.clone().unwrap_or_default(),
937            comment: opts.comment.map_or_else(Vec::new, |h| vec![h]),
938        })
939    }
940}
941
942impl TryFrom<&JsonOptions> for protobuf::JsonOptions {
943    type Error = DataFusionError;
944
945    fn try_from(opts: &JsonOptions) -> datafusion_common::Result<Self, Self::Error> {
946        let compression: protobuf::CompressionTypeVariant = opts.compression.into();
947        Ok(protobuf::JsonOptions {
948            compression: compression.into(),
949            schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
950        })
951    }
952}
953
954/// Creates a scalar protobuf value from an optional value (T), and
955/// encoding None as the appropriate datatype
956fn create_proto_scalar<I, T: FnOnce(&I) -> protobuf::scalar_value::Value>(
957    v: Option<&I>,
958    null_arrow_type: &DataType,
959    constructor: T,
960) -> Result<protobuf::ScalarValue, Error> {
961    let value = v
962        .map(constructor)
963        .unwrap_or(protobuf::scalar_value::Value::NullValue(
964            null_arrow_type.try_into()?,
965        ));
966
967    Ok(protobuf::ScalarValue { value: Some(value) })
968}
969
970// ScalarValue::List / FixedSizeList / LargeList / Struct / Map are serialized using
971// Arrow IPC messages as a single column RecordBatch
972fn encode_scalar_nested_value(
973    arr: ArrayRef,
974    val: &ScalarValue,
975) -> Result<protobuf::ScalarValue, Error> {
976    let batch = RecordBatch::try_from_iter(vec![("field_name", arr)]).map_err(|e| {
977        Error::General(format!(
978            "Error creating temporary batch while encoding ScalarValue::List: {e}"
979        ))
980    })?;
981
982    let gen = IpcDataGenerator {};
983    let mut dict_tracker = DictionaryTracker::new(false);
984    let (encoded_dictionaries, encoded_message) = gen
985        .encoded_batch(&batch, &mut dict_tracker, &Default::default())
986        .map_err(|e| {
987            Error::General(format!("Error encoding ScalarValue::List as IPC: {e}"))
988        })?;
989
990    let schema: protobuf::Schema = batch.schema().try_into()?;
991
992    let scalar_list_value = protobuf::ScalarNestedValue {
993        ipc_message: encoded_message.ipc_message,
994        arrow_data: encoded_message.arrow_data,
995        dictionaries: encoded_dictionaries
996            .into_iter()
997            .map(|data| protobuf::scalar_nested_value::Dictionary {
998                ipc_message: data.ipc_message,
999                arrow_data: data.arrow_data,
1000            })
1001            .collect(),
1002        schema: Some(schema),
1003    };
1004
1005    match val {
1006        ScalarValue::List(_) => Ok(protobuf::ScalarValue {
1007            value: Some(protobuf::scalar_value::Value::ListValue(scalar_list_value)),
1008        }),
1009        ScalarValue::LargeList(_) => Ok(protobuf::ScalarValue {
1010            value: Some(protobuf::scalar_value::Value::LargeListValue(
1011                scalar_list_value,
1012            )),
1013        }),
1014        ScalarValue::FixedSizeList(_) => Ok(protobuf::ScalarValue {
1015            value: Some(protobuf::scalar_value::Value::FixedSizeListValue(
1016                scalar_list_value,
1017            )),
1018        }),
1019        ScalarValue::Struct(_) => Ok(protobuf::ScalarValue {
1020            value: Some(protobuf::scalar_value::Value::StructValue(
1021                scalar_list_value,
1022            )),
1023        }),
1024        ScalarValue::Map(_) => Ok(protobuf::ScalarValue {
1025            value: Some(protobuf::scalar_value::Value::MapValue(scalar_list_value)),
1026        }),
1027        _ => unreachable!(),
1028    }
1029}
1030
1031/// Converts a vector of `Arc<arrow::Field>`s to `protobuf::Field`s
1032fn convert_arc_fields_to_proto_fields<'a, I>(
1033    fields: I,
1034) -> Result<Vec<protobuf::Field>, Error>
1035where
1036    I: IntoIterator<Item = &'a Arc<Field>>,
1037{
1038    fields
1039        .into_iter()
1040        .map(|field| field.as_ref().try_into())
1041        .collect::<Result<Vec<_>, Error>>()
1042}
1043
1044pub(crate) fn csv_writer_options_to_proto(
1045    csv_options: &WriterBuilder,
1046    compression: &CompressionTypeVariant,
1047) -> protobuf::CsvWriterOptions {
1048    let compression: protobuf::CompressionTypeVariant = compression.into();
1049    protobuf::CsvWriterOptions {
1050        compression: compression.into(),
1051        delimiter: (csv_options.delimiter() as char).to_string(),
1052        has_header: csv_options.header(),
1053        date_format: csv_options.date_format().unwrap_or("").to_owned(),
1054        datetime_format: csv_options.datetime_format().unwrap_or("").to_owned(),
1055        timestamp_format: csv_options.timestamp_format().unwrap_or("").to_owned(),
1056        time_format: csv_options.time_format().unwrap_or("").to_owned(),
1057        null_value: csv_options.null().to_owned(),
1058        quote: (csv_options.quote() as char).to_string(),
1059        escape: (csv_options.escape() as char).to_string(),
1060        double_quote: csv_options.double_quote(),
1061    }
1062}