datafusion_proto_common/to_proto/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::protobuf_common as protobuf;
22use crate::protobuf_common::{
23    arrow_type::ArrowTypeEnum, scalar_value::Value, EmptyMessage,
24};
25use arrow::array::{ArrayRef, RecordBatch};
26use arrow::csv::WriterBuilder;
27use arrow::datatypes::{
28    DataType, Field, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, Schema,
29    SchemaRef, TimeUnit, UnionMode,
30};
31use arrow::ipc::writer::{
32    CompressionContext, DictionaryTracker, IpcDataGenerator, IpcWriteOptions,
33};
34use datafusion_common::{
35    config::{
36        CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
37        TableParquetOptions,
38    },
39    file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
40    parsers::CompressionTypeVariant,
41    plan_datafusion_err,
42    stats::Precision,
43    Column, ColumnStatistics, Constraint, Constraints, DFSchema, DFSchemaRef,
44    DataFusionError, JoinSide, ScalarValue, Statistics,
45};
46
47#[derive(Debug)]
48pub enum Error {
49    General(String),
50
51    InvalidScalarValue(ScalarValue),
52
53    InvalidScalarType(DataType),
54
55    InvalidTimeUnit(TimeUnit),
56
57    NotImplemented(String),
58}
59
60impl std::error::Error for Error {}
61
62impl std::fmt::Display for Error {
63    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
64        match self {
65            Self::General(desc) => write!(f, "General error: {desc}"),
66            Self::InvalidScalarValue(value) => {
67                write!(f, "{value:?} is invalid as a DataFusion scalar value")
68            }
69            Self::InvalidScalarType(data_type) => {
70                write!(f, "{data_type} is invalid as a DataFusion scalar type")
71            }
72            Self::InvalidTimeUnit(time_unit) => {
73                write!(
74                    f,
75                    "Only TimeUnit::Microsecond and TimeUnit::Nanosecond are valid time units, found: {time_unit:?}"
76                )
77            }
78            Self::NotImplemented(s) => {
79                write!(f, "Not implemented: {s}")
80            }
81        }
82    }
83}
84
85impl From<Error> for DataFusionError {
86    fn from(e: Error) -> Self {
87        plan_datafusion_err!("{}", e)
88    }
89}
90
91impl TryFrom<&Field> for protobuf::Field {
92    type Error = Error;
93
94    fn try_from(field: &Field) -> Result<Self, Self::Error> {
95        let arrow_type = field.data_type().try_into()?;
96        Ok(Self {
97            name: field.name().to_owned(),
98            arrow_type: Some(Box::new(arrow_type)),
99            nullable: field.is_nullable(),
100            children: Vec::new(),
101            metadata: field.metadata().clone(),
102        })
103    }
104}
105
106impl TryFrom<&DataType> for protobuf::ArrowType {
107    type Error = Error;
108
109    fn try_from(val: &DataType) -> Result<Self, Self::Error> {
110        let arrow_type_enum: ArrowTypeEnum = val.try_into()?;
111        Ok(Self {
112            arrow_type_enum: Some(arrow_type_enum),
113        })
114    }
115}
116
117impl TryFrom<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
118    type Error = Error;
119
120    fn try_from(val: &DataType) -> Result<Self, Self::Error> {
121        let res = match val {
122            DataType::Null => Self::None(EmptyMessage {}),
123            DataType::Boolean => Self::Bool(EmptyMessage {}),
124            DataType::Int8 => Self::Int8(EmptyMessage {}),
125            DataType::Int16 => Self::Int16(EmptyMessage {}),
126            DataType::Int32 => Self::Int32(EmptyMessage {}),
127            DataType::Int64 => Self::Int64(EmptyMessage {}),
128            DataType::UInt8 => Self::Uint8(EmptyMessage {}),
129            DataType::UInt16 => Self::Uint16(EmptyMessage {}),
130            DataType::UInt32 => Self::Uint32(EmptyMessage {}),
131            DataType::UInt64 => Self::Uint64(EmptyMessage {}),
132            DataType::Float16 => Self::Float16(EmptyMessage {}),
133            DataType::Float32 => Self::Float32(EmptyMessage {}),
134            DataType::Float64 => Self::Float64(EmptyMessage {}),
135            DataType::Timestamp(time_unit, timezone) => {
136                Self::Timestamp(protobuf::Timestamp {
137                    time_unit: protobuf::TimeUnit::from(time_unit) as i32,
138                    timezone: timezone.as_deref().unwrap_or("").to_string(),
139                })
140            }
141            DataType::Date32 => Self::Date32(EmptyMessage {}),
142            DataType::Date64 => Self::Date64(EmptyMessage {}),
143            DataType::Time32(time_unit) => {
144                Self::Time32(protobuf::TimeUnit::from(time_unit) as i32)
145            }
146            DataType::Time64(time_unit) => {
147                Self::Time64(protobuf::TimeUnit::from(time_unit) as i32)
148            }
149            DataType::Duration(time_unit) => {
150                Self::Duration(protobuf::TimeUnit::from(time_unit) as i32)
151            }
152            DataType::Interval(interval_unit) => {
153                Self::Interval(protobuf::IntervalUnit::from(interval_unit) as i32)
154            }
155            DataType::Binary => Self::Binary(EmptyMessage {}),
156            DataType::BinaryView => Self::BinaryView(EmptyMessage {}),
157            DataType::FixedSizeBinary(size) => Self::FixedSizeBinary(*size),
158            DataType::LargeBinary => Self::LargeBinary(EmptyMessage {}),
159            DataType::Utf8 => Self::Utf8(EmptyMessage {}),
160            DataType::Utf8View => Self::Utf8View(EmptyMessage {}),
161            DataType::LargeUtf8 => Self::LargeUtf8(EmptyMessage {}),
162            DataType::List(item_type) => Self::List(Box::new(protobuf::List {
163                field_type: Some(Box::new(item_type.as_ref().try_into()?)),
164            })),
165            DataType::FixedSizeList(item_type, size) => {
166                Self::FixedSizeList(Box::new(protobuf::FixedSizeList {
167                    field_type: Some(Box::new(item_type.as_ref().try_into()?)),
168                    list_size: *size,
169                }))
170            }
171            DataType::LargeList(item_type) => Self::LargeList(Box::new(protobuf::List {
172                field_type: Some(Box::new(item_type.as_ref().try_into()?)),
173            })),
174            DataType::Struct(struct_fields) => Self::Struct(protobuf::Struct {
175                sub_field_types: convert_arc_fields_to_proto_fields(struct_fields)?,
176            }),
177            DataType::Union(fields, union_mode) => {
178                let union_mode = match union_mode {
179                    UnionMode::Sparse => protobuf::UnionMode::Sparse,
180                    UnionMode::Dense => protobuf::UnionMode::Dense,
181                };
182                Self::Union(protobuf::Union {
183                    union_types: convert_arc_fields_to_proto_fields(fields.iter().map(|(_, item)|item))?,
184                    union_mode: union_mode.into(),
185                    type_ids: fields.iter().map(|(x, _)| x as i32).collect(),
186                })
187            }
188            DataType::Dictionary(key_type, value_type) => {
189                Self::Dictionary(Box::new(protobuf::Dictionary {
190                    key: Some(Box::new(key_type.as_ref().try_into()?)),
191                    value: Some(Box::new(value_type.as_ref().try_into()?)),
192                }))
193            }
194            DataType::Decimal32(precision, scale) => Self::Decimal32(protobuf::Decimal32Type {
195                precision: *precision as u32,
196                scale: *scale as i32,
197            }),
198            DataType::Decimal64(precision, scale) => Self::Decimal64(protobuf::Decimal64Type {
199                precision: *precision as u32,
200                scale: *scale as i32,
201            }),
202            DataType::Decimal128(precision, scale) => Self::Decimal128(protobuf::Decimal128Type {
203                precision: *precision as u32,
204                scale: *scale as i32,
205            }),
206            DataType::Decimal256(precision, scale) => Self::Decimal256(protobuf::Decimal256Type {
207                precision: *precision as u32,
208                scale: *scale as i32,
209            }),
210            DataType::Map(field, sorted) => {
211                Self::Map(Box::new(
212                    protobuf::Map {
213                        field_type: Some(Box::new(field.as_ref().try_into()?)),
214                        keys_sorted: *sorted,
215                    }
216                ))
217            }
218            DataType::RunEndEncoded(_, _) => {
219                return Err(Error::General(
220                    "Proto serialization error: The RunEndEncoded data type is not yet supported".to_owned()
221                ))
222            }
223            DataType::ListView(_) | DataType::LargeListView(_) => {
224                return Err(Error::General(format!("Proto serialization error: {val} not yet supported")))
225            }
226        };
227
228        Ok(res)
229    }
230}
231
232impl From<Column> for protobuf::Column {
233    fn from(c: Column) -> Self {
234        Self {
235            relation: c.relation.map(|relation| protobuf::ColumnRelation {
236                relation: relation.to_string(),
237            }),
238            name: c.name,
239        }
240    }
241}
242
243impl From<&Column> for protobuf::Column {
244    fn from(c: &Column) -> Self {
245        c.clone().into()
246    }
247}
248
249impl TryFrom<&Schema> for protobuf::Schema {
250    type Error = Error;
251
252    fn try_from(schema: &Schema) -> Result<Self, Self::Error> {
253        Ok(Self {
254            columns: convert_arc_fields_to_proto_fields(schema.fields())?,
255            metadata: schema.metadata.clone(),
256        })
257    }
258}
259
260impl TryFrom<SchemaRef> for protobuf::Schema {
261    type Error = Error;
262
263    fn try_from(schema: SchemaRef) -> Result<Self, Self::Error> {
264        Ok(Self {
265            columns: convert_arc_fields_to_proto_fields(schema.fields())?,
266            metadata: schema.metadata.clone(),
267        })
268    }
269}
270
271impl TryFrom<&DFSchema> for protobuf::DfSchema {
272    type Error = Error;
273
274    fn try_from(s: &DFSchema) -> Result<Self, Self::Error> {
275        let columns = s
276            .iter()
277            .map(|(qualifier, field)| {
278                Ok(protobuf::DfField {
279                    field: Some(field.as_ref().try_into()?),
280                    qualifier: qualifier.map(|r| protobuf::ColumnRelation {
281                        relation: r.to_string(),
282                    }),
283                })
284            })
285            .collect::<Result<Vec<_>, Error>>()?;
286        Ok(Self {
287            columns,
288            metadata: s.metadata().clone(),
289        })
290    }
291}
292
293impl TryFrom<&DFSchemaRef> for protobuf::DfSchema {
294    type Error = Error;
295
296    fn try_from(s: &DFSchemaRef) -> Result<Self, Self::Error> {
297        s.as_ref().try_into()
298    }
299}
300
301impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
302    type Error = Error;
303
304    fn try_from(val: &ScalarValue) -> Result<Self, Self::Error> {
305        let data_type = val.data_type();
306        match val {
307            ScalarValue::Boolean(val) => {
308                create_proto_scalar(val.as_ref(), &data_type, |s| Value::BoolValue(*s))
309            }
310            ScalarValue::Float16(val) => {
311                create_proto_scalar(val.as_ref(), &data_type, |s| {
312                    Value::Float32Value((*s).into())
313                })
314            }
315            ScalarValue::Float32(val) => {
316                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Float32Value(*s))
317            }
318            ScalarValue::Float64(val) => {
319                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Float64Value(*s))
320            }
321            ScalarValue::Int8(val) => {
322                create_proto_scalar(val.as_ref(), &data_type, |s| {
323                    Value::Int8Value(*s as i32)
324                })
325            }
326            ScalarValue::Int16(val) => {
327                create_proto_scalar(val.as_ref(), &data_type, |s| {
328                    Value::Int16Value(*s as i32)
329                })
330            }
331            ScalarValue::Int32(val) => {
332                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Int32Value(*s))
333            }
334            ScalarValue::Int64(val) => {
335                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Int64Value(*s))
336            }
337            ScalarValue::UInt8(val) => {
338                create_proto_scalar(val.as_ref(), &data_type, |s| {
339                    Value::Uint8Value(*s as u32)
340                })
341            }
342            ScalarValue::UInt16(val) => {
343                create_proto_scalar(val.as_ref(), &data_type, |s| {
344                    Value::Uint16Value(*s as u32)
345                })
346            }
347            ScalarValue::UInt32(val) => {
348                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Uint32Value(*s))
349            }
350            ScalarValue::UInt64(val) => {
351                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Uint64Value(*s))
352            }
353            ScalarValue::Utf8(val) => {
354                create_proto_scalar(val.as_ref(), &data_type, |s| {
355                    Value::Utf8Value(s.to_owned())
356                })
357            }
358            ScalarValue::LargeUtf8(val) => {
359                create_proto_scalar(val.as_ref(), &data_type, |s| {
360                    Value::LargeUtf8Value(s.to_owned())
361                })
362            }
363            ScalarValue::Utf8View(val) => {
364                create_proto_scalar(val.as_ref(), &data_type, |s| {
365                    Value::Utf8ViewValue(s.to_owned())
366                })
367            }
368            ScalarValue::List(arr) => {
369                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
370            }
371            ScalarValue::LargeList(arr) => {
372                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
373            }
374            ScalarValue::FixedSizeList(arr) => {
375                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
376            }
377            ScalarValue::Struct(arr) => {
378                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
379            }
380            ScalarValue::Map(arr) => {
381                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
382            }
383            ScalarValue::Date32(val) => {
384                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Date32Value(*s))
385            }
386            ScalarValue::TimestampMicrosecond(val, tz) => {
387                create_proto_scalar(val.as_ref(), &data_type, |s| {
388                    Value::TimestampValue(protobuf::ScalarTimestampValue {
389                        timezone: tz.as_deref().unwrap_or("").to_string(),
390                        value: Some(
391                            protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(
392                                *s,
393                            ),
394                        ),
395                    })
396                })
397            }
398            ScalarValue::TimestampNanosecond(val, tz) => {
399                create_proto_scalar(val.as_ref(), &data_type, |s| {
400                    Value::TimestampValue(protobuf::ScalarTimestampValue {
401                        timezone: tz.as_deref().unwrap_or("").to_string(),
402                        value: Some(
403                            protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(
404                                *s,
405                            ),
406                        ),
407                    })
408                })
409            }
410            ScalarValue::Decimal32(val, p, s) => match *val {
411                Some(v) => {
412                    let array = v.to_be_bytes();
413                    let vec_val: Vec<u8> = array.to_vec();
414                    Ok(protobuf::ScalarValue {
415                        value: Some(Value::Decimal32Value(protobuf::Decimal32 {
416                            value: vec_val,
417                            p: *p as i64,
418                            s: *s as i64,
419                        })),
420                    })
421                }
422                None => Ok(protobuf::ScalarValue {
423                    value: Some(protobuf::scalar_value::Value::NullValue(
424                        (&data_type).try_into()?,
425                    )),
426                }),
427            },
428            ScalarValue::Decimal64(val, p, s) => match *val {
429                Some(v) => {
430                    let array = v.to_be_bytes();
431                    let vec_val: Vec<u8> = array.to_vec();
432                    Ok(protobuf::ScalarValue {
433                        value: Some(Value::Decimal64Value(protobuf::Decimal64 {
434                            value: vec_val,
435                            p: *p as i64,
436                            s: *s as i64,
437                        })),
438                    })
439                }
440                None => Ok(protobuf::ScalarValue {
441                    value: Some(protobuf::scalar_value::Value::NullValue(
442                        (&data_type).try_into()?,
443                    )),
444                }),
445            },
446            ScalarValue::Decimal128(val, p, s) => match *val {
447                Some(v) => {
448                    let array = v.to_be_bytes();
449                    let vec_val: Vec<u8> = array.to_vec();
450                    Ok(protobuf::ScalarValue {
451                        value: Some(Value::Decimal128Value(protobuf::Decimal128 {
452                            value: vec_val,
453                            p: *p as i64,
454                            s: *s as i64,
455                        })),
456                    })
457                }
458                None => Ok(protobuf::ScalarValue {
459                    value: Some(protobuf::scalar_value::Value::NullValue(
460                        (&data_type).try_into()?,
461                    )),
462                }),
463            },
464            ScalarValue::Decimal256(val, p, s) => match *val {
465                Some(v) => {
466                    let array = v.to_be_bytes();
467                    let vec_val: Vec<u8> = array.to_vec();
468                    Ok(protobuf::ScalarValue {
469                        value: Some(Value::Decimal256Value(protobuf::Decimal256 {
470                            value: vec_val,
471                            p: *p as i64,
472                            s: *s as i64,
473                        })),
474                    })
475                }
476                None => Ok(protobuf::ScalarValue {
477                    value: Some(protobuf::scalar_value::Value::NullValue(
478                        (&data_type).try_into()?,
479                    )),
480                }),
481            },
482            ScalarValue::Date64(val) => {
483                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Date64Value(*s))
484            }
485            ScalarValue::TimestampSecond(val, tz) => {
486                create_proto_scalar(val.as_ref(), &data_type, |s| {
487                    Value::TimestampValue(protobuf::ScalarTimestampValue {
488                        timezone: tz.as_deref().unwrap_or("").to_string(),
489                        value: Some(
490                            protobuf::scalar_timestamp_value::Value::TimeSecondValue(*s),
491                        ),
492                    })
493                })
494            }
495            ScalarValue::TimestampMillisecond(val, tz) => {
496                create_proto_scalar(val.as_ref(), &data_type, |s| {
497                    Value::TimestampValue(protobuf::ScalarTimestampValue {
498                        timezone: tz.as_deref().unwrap_or("").to_string(),
499                        value: Some(
500                            protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(
501                                *s,
502                            ),
503                        ),
504                    })
505                })
506            }
507            ScalarValue::IntervalYearMonth(val) => {
508                create_proto_scalar(val.as_ref(), &data_type, |s| {
509                    Value::IntervalYearmonthValue(*s)
510                })
511            }
512            ScalarValue::Null => Ok(protobuf::ScalarValue {
513                value: Some(Value::NullValue((&data_type).try_into()?)),
514            }),
515
516            ScalarValue::Binary(val) => {
517                create_proto_scalar(val.as_ref(), &data_type, |s| {
518                    Value::BinaryValue(s.to_owned())
519                })
520            }
521            ScalarValue::BinaryView(val) => {
522                create_proto_scalar(val.as_ref(), &data_type, |s| {
523                    Value::BinaryViewValue(s.to_owned())
524                })
525            }
526            ScalarValue::LargeBinary(val) => {
527                create_proto_scalar(val.as_ref(), &data_type, |s| {
528                    Value::LargeBinaryValue(s.to_owned())
529                })
530            }
531            ScalarValue::FixedSizeBinary(length, val) => {
532                create_proto_scalar(val.as_ref(), &data_type, |s| {
533                    Value::FixedSizeBinaryValue(protobuf::ScalarFixedSizeBinary {
534                        values: s.to_owned(),
535                        length: *length,
536                    })
537                })
538            }
539
540            ScalarValue::Time32Second(v) => {
541                create_proto_scalar(v.as_ref(), &data_type, |v| {
542                    Value::Time32Value(protobuf::ScalarTime32Value {
543                        value: Some(
544                            protobuf::scalar_time32_value::Value::Time32SecondValue(*v),
545                        ),
546                    })
547                })
548            }
549
550            ScalarValue::Time32Millisecond(v) => {
551                create_proto_scalar(v.as_ref(), &data_type, |v| {
552                    Value::Time32Value(protobuf::ScalarTime32Value {
553                        value: Some(
554                            protobuf::scalar_time32_value::Value::Time32MillisecondValue(
555                                *v,
556                            ),
557                        ),
558                    })
559                })
560            }
561
562            ScalarValue::Time64Microsecond(v) => {
563                create_proto_scalar(v.as_ref(), &data_type, |v| {
564                    Value::Time64Value(protobuf::ScalarTime64Value {
565                        value: Some(
566                            protobuf::scalar_time64_value::Value::Time64MicrosecondValue(
567                                *v,
568                            ),
569                        ),
570                    })
571                })
572            }
573
574            ScalarValue::Time64Nanosecond(v) => {
575                create_proto_scalar(v.as_ref(), &data_type, |v| {
576                    Value::Time64Value(protobuf::ScalarTime64Value {
577                        value: Some(
578                            protobuf::scalar_time64_value::Value::Time64NanosecondValue(
579                                *v,
580                            ),
581                        ),
582                    })
583                })
584            }
585
586            ScalarValue::IntervalDayTime(val) => {
587                let value = if let Some(v) = val {
588                    let (days, milliseconds) = IntervalDayTimeType::to_parts(*v);
589                    Value::IntervalDaytimeValue(protobuf::IntervalDayTimeValue {
590                        days,
591                        milliseconds,
592                    })
593                } else {
594                    Value::NullValue((&data_type).try_into()?)
595                };
596
597                Ok(protobuf::ScalarValue { value: Some(value) })
598            }
599
600            ScalarValue::IntervalMonthDayNano(v) => {
601                let value = if let Some(v) = v {
602                    let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v);
603                    Value::IntervalMonthDayNano(protobuf::IntervalMonthDayNanoValue {
604                        months,
605                        days,
606                        nanos,
607                    })
608                } else {
609                    Value::NullValue((&data_type).try_into()?)
610                };
611
612                Ok(protobuf::ScalarValue { value: Some(value) })
613            }
614
615            ScalarValue::DurationSecond(v) => {
616                let value = match v {
617                    Some(v) => Value::DurationSecondValue(*v),
618                    None => Value::NullValue((&data_type).try_into()?),
619                };
620                Ok(protobuf::ScalarValue { value: Some(value) })
621            }
622            ScalarValue::DurationMillisecond(v) => {
623                let value = match v {
624                    Some(v) => Value::DurationMillisecondValue(*v),
625                    None => Value::NullValue((&data_type).try_into()?),
626                };
627                Ok(protobuf::ScalarValue { value: Some(value) })
628            }
629            ScalarValue::DurationMicrosecond(v) => {
630                let value = match v {
631                    Some(v) => Value::DurationMicrosecondValue(*v),
632                    None => Value::NullValue((&data_type).try_into()?),
633                };
634                Ok(protobuf::ScalarValue { value: Some(value) })
635            }
636            ScalarValue::DurationNanosecond(v) => {
637                let value = match v {
638                    Some(v) => Value::DurationNanosecondValue(*v),
639                    None => Value::NullValue((&data_type).try_into()?),
640                };
641                Ok(protobuf::ScalarValue { value: Some(value) })
642            }
643
644            ScalarValue::Union(val, df_fields, mode) => {
645                let mut fields =
646                    Vec::<protobuf::UnionField>::with_capacity(df_fields.len());
647                for (id, field) in df_fields.iter() {
648                    let field_id = id as i32;
649                    let field = Some(field.as_ref().try_into()?);
650                    let field = protobuf::UnionField { field_id, field };
651                    fields.push(field);
652                }
653                let mode = match mode {
654                    UnionMode::Sparse => 0,
655                    UnionMode::Dense => 1,
656                };
657                let value = match val {
658                    None => None,
659                    Some((_id, v)) => Some(Box::new(v.as_ref().try_into()?)),
660                };
661                let val = protobuf::UnionValue {
662                    value_id: val.as_ref().map(|(id, _v)| *id as i32).unwrap_or(0),
663                    value,
664                    fields,
665                    mode,
666                };
667                let val = Value::UnionValue(Box::new(val));
668                let val = protobuf::ScalarValue { value: Some(val) };
669                Ok(val)
670            }
671
672            ScalarValue::Dictionary(index_type, val) => {
673                let value: protobuf::ScalarValue = val.as_ref().try_into()?;
674                Ok(protobuf::ScalarValue {
675                    value: Some(Value::DictionaryValue(Box::new(
676                        protobuf::ScalarDictionaryValue {
677                            index_type: Some(index_type.as_ref().try_into()?),
678                            value: Some(Box::new(value)),
679                        },
680                    ))),
681                })
682            }
683        }
684    }
685}
686
687impl From<&TimeUnit> for protobuf::TimeUnit {
688    fn from(val: &TimeUnit) -> Self {
689        match val {
690            TimeUnit::Second => protobuf::TimeUnit::Second,
691            TimeUnit::Millisecond => protobuf::TimeUnit::Millisecond,
692            TimeUnit::Microsecond => protobuf::TimeUnit::Microsecond,
693            TimeUnit::Nanosecond => protobuf::TimeUnit::Nanosecond,
694        }
695    }
696}
697
698impl From<&IntervalUnit> for protobuf::IntervalUnit {
699    fn from(interval_unit: &IntervalUnit) -> Self {
700        match interval_unit {
701            IntervalUnit::YearMonth => protobuf::IntervalUnit::YearMonth,
702            IntervalUnit::DayTime => protobuf::IntervalUnit::DayTime,
703            IntervalUnit::MonthDayNano => protobuf::IntervalUnit::MonthDayNano,
704        }
705    }
706}
707
708impl From<Constraints> for protobuf::Constraints {
709    fn from(value: Constraints) -> Self {
710        let constraints = value.into_iter().map(|item| item.into()).collect();
711        protobuf::Constraints { constraints }
712    }
713}
714
715impl From<Constraint> for protobuf::Constraint {
716    fn from(value: Constraint) -> Self {
717        let res = match value {
718            Constraint::PrimaryKey(indices) => {
719                let indices = indices.into_iter().map(|item| item as u64).collect();
720                protobuf::constraint::ConstraintMode::PrimaryKey(
721                    protobuf::PrimaryKeyConstraint { indices },
722                )
723            }
724            Constraint::Unique(indices) => {
725                let indices = indices.into_iter().map(|item| item as u64).collect();
726                protobuf::constraint::ConstraintMode::PrimaryKey(
727                    protobuf::PrimaryKeyConstraint { indices },
728                )
729            }
730        };
731        protobuf::Constraint {
732            constraint_mode: Some(res),
733        }
734    }
735}
736
737impl From<&Precision<usize>> for protobuf::Precision {
738    fn from(s: &Precision<usize>) -> protobuf::Precision {
739        match s {
740            Precision::Exact(val) => protobuf::Precision {
741                precision_info: protobuf::PrecisionInfo::Exact.into(),
742                val: Some(crate::protobuf_common::ScalarValue {
743                    value: Some(Value::Uint64Value(*val as u64)),
744                }),
745            },
746            Precision::Inexact(val) => protobuf::Precision {
747                precision_info: protobuf::PrecisionInfo::Inexact.into(),
748                val: Some(crate::protobuf_common::ScalarValue {
749                    value: Some(Value::Uint64Value(*val as u64)),
750                }),
751            },
752            Precision::Absent => protobuf::Precision {
753                precision_info: protobuf::PrecisionInfo::Absent.into(),
754                val: Some(crate::protobuf_common::ScalarValue { value: None }),
755            },
756        }
757    }
758}
759
760impl From<&Precision<datafusion_common::ScalarValue>> for protobuf::Precision {
761    fn from(s: &Precision<datafusion_common::ScalarValue>) -> protobuf::Precision {
762        match s {
763            Precision::Exact(val) => protobuf::Precision {
764                precision_info: protobuf::PrecisionInfo::Exact.into(),
765                val: val.try_into().ok(),
766            },
767            Precision::Inexact(val) => protobuf::Precision {
768                precision_info: protobuf::PrecisionInfo::Inexact.into(),
769                val: val.try_into().ok(),
770            },
771            Precision::Absent => protobuf::Precision {
772                precision_info: protobuf::PrecisionInfo::Absent.into(),
773                val: Some(crate::protobuf_common::ScalarValue { value: None }),
774            },
775        }
776    }
777}
778
779impl From<&Statistics> for protobuf::Statistics {
780    fn from(s: &Statistics) -> protobuf::Statistics {
781        let column_stats = s.column_statistics.iter().map(|s| s.into()).collect();
782        protobuf::Statistics {
783            num_rows: Some(protobuf::Precision::from(&s.num_rows)),
784            total_byte_size: Some(protobuf::Precision::from(&s.total_byte_size)),
785            column_stats,
786        }
787    }
788}
789
790impl From<&ColumnStatistics> for protobuf::ColumnStats {
791    fn from(s: &ColumnStatistics) -> protobuf::ColumnStats {
792        protobuf::ColumnStats {
793            min_value: Some(protobuf::Precision::from(&s.min_value)),
794            max_value: Some(protobuf::Precision::from(&s.max_value)),
795            sum_value: Some(protobuf::Precision::from(&s.sum_value)),
796            null_count: Some(protobuf::Precision::from(&s.null_count)),
797            distinct_count: Some(protobuf::Precision::from(&s.distinct_count)),
798        }
799    }
800}
801
802impl From<JoinSide> for protobuf::JoinSide {
803    fn from(t: JoinSide) -> Self {
804        match t {
805            JoinSide::Left => protobuf::JoinSide::LeftSide,
806            JoinSide::Right => protobuf::JoinSide::RightSide,
807            JoinSide::None => protobuf::JoinSide::None,
808        }
809    }
810}
811
812impl From<&CompressionTypeVariant> for protobuf::CompressionTypeVariant {
813    fn from(value: &CompressionTypeVariant) -> Self {
814        match value {
815            CompressionTypeVariant::GZIP => Self::Gzip,
816            CompressionTypeVariant::BZIP2 => Self::Bzip2,
817            CompressionTypeVariant::XZ => Self::Xz,
818            CompressionTypeVariant::ZSTD => Self::Zstd,
819            CompressionTypeVariant::UNCOMPRESSED => Self::Uncompressed,
820        }
821    }
822}
823
824impl TryFrom<&CsvWriterOptions> for protobuf::CsvWriterOptions {
825    type Error = DataFusionError;
826
827    fn try_from(opts: &CsvWriterOptions) -> datafusion_common::Result<Self, Self::Error> {
828        Ok(csv_writer_options_to_proto(
829            &opts.writer_options,
830            &opts.compression,
831        ))
832    }
833}
834
835impl TryFrom<&JsonWriterOptions> for protobuf::JsonWriterOptions {
836    type Error = DataFusionError;
837
838    fn try_from(
839        opts: &JsonWriterOptions,
840    ) -> datafusion_common::Result<Self, Self::Error> {
841        let compression: protobuf::CompressionTypeVariant = opts.compression.into();
842        Ok(protobuf::JsonWriterOptions {
843            compression: compression.into(),
844        })
845    }
846}
847
848impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
849    type Error = DataFusionError;
850
851    fn try_from(value: &ParquetOptions) -> datafusion_common::Result<Self, Self::Error> {
852        Ok(protobuf::ParquetOptions {
853            enable_page_index: value.enable_page_index,
854            pruning: value.pruning,
855            skip_metadata: value.skip_metadata,
856            metadata_size_hint_opt: value.metadata_size_hint.map(|v| protobuf::parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v as u64)),
857            pushdown_filters: value.pushdown_filters,
858            reorder_filters: value.reorder_filters,
859            data_pagesize_limit: value.data_pagesize_limit as u64,
860            write_batch_size: value.write_batch_size as u64,
861            writer_version: value.writer_version.clone(),
862            compression_opt: value.compression.clone().map(protobuf::parquet_options::CompressionOpt::Compression),
863            dictionary_enabled_opt: value.dictionary_enabled.map(protobuf::parquet_options::DictionaryEnabledOpt::DictionaryEnabled),
864            dictionary_page_size_limit: value.dictionary_page_size_limit as u64,
865            statistics_enabled_opt: value.statistics_enabled.clone().map(protobuf::parquet_options::StatisticsEnabledOpt::StatisticsEnabled),
866            max_row_group_size: value.max_row_group_size as u64,
867            created_by: value.created_by.clone(),
868            column_index_truncate_length_opt: value.column_index_truncate_length.map(|v| protobuf::parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v as u64)),
869            statistics_truncate_length_opt: value.statistics_truncate_length.map(|v| protobuf::parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v as u64)),
870            data_page_row_count_limit: value.data_page_row_count_limit as u64,
871            encoding_opt: value.encoding.clone().map(protobuf::parquet_options::EncodingOpt::Encoding),
872            bloom_filter_on_read: value.bloom_filter_on_read,
873            bloom_filter_on_write: value.bloom_filter_on_write,
874            bloom_filter_fpp_opt: value.bloom_filter_fpp.map(protobuf::parquet_options::BloomFilterFppOpt::BloomFilterFpp),
875            bloom_filter_ndv_opt: value.bloom_filter_ndv.map(protobuf::parquet_options::BloomFilterNdvOpt::BloomFilterNdv),
876            allow_single_file_parallelism: value.allow_single_file_parallelism,
877            maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as u64,
878            maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as u64,
879            schema_force_view_types: value.schema_force_view_types,
880            binary_as_string: value.binary_as_string,
881            skip_arrow_metadata: value.skip_arrow_metadata,
882            coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96),
883            max_predicate_cache_size_opt: value.max_predicate_cache_size.map(|v| protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as u64)),
884        })
885    }
886}
887
888impl TryFrom<&ParquetColumnOptions> for protobuf::ParquetColumnOptions {
889    type Error = DataFusionError;
890
891    fn try_from(
892        value: &ParquetColumnOptions,
893    ) -> datafusion_common::Result<Self, Self::Error> {
894        Ok(protobuf::ParquetColumnOptions {
895            compression_opt: value
896                .compression
897                .clone()
898                .map(protobuf::parquet_column_options::CompressionOpt::Compression),
899            dictionary_enabled_opt: value
900                .dictionary_enabled
901                .map(protobuf::parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled),
902            statistics_enabled_opt: value
903                .statistics_enabled
904                .clone()
905                .map(protobuf::parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled),
906            encoding_opt: value
907                .encoding
908                .clone()
909                .map(protobuf::parquet_column_options::EncodingOpt::Encoding),
910            bloom_filter_enabled_opt: value
911                .bloom_filter_enabled
912                .map(protobuf::parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled),
913            bloom_filter_fpp_opt: value
914                .bloom_filter_fpp
915                .map(protobuf::parquet_column_options::BloomFilterFppOpt::BloomFilterFpp),
916            bloom_filter_ndv_opt: value
917                .bloom_filter_ndv
918                .map(protobuf::parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv),
919        })
920    }
921}
922
923impl TryFrom<&TableParquetOptions> for protobuf::TableParquetOptions {
924    type Error = DataFusionError;
925    fn try_from(
926        value: &TableParquetOptions,
927    ) -> datafusion_common::Result<Self, Self::Error> {
928        let column_specific_options = value
929            .column_specific_options
930            .iter()
931            .map(|(k, v)| {
932                Ok(protobuf::ParquetColumnSpecificOptions {
933                    column_name: k.into(),
934                    options: Some(v.try_into()?),
935                })
936            })
937            .collect::<datafusion_common::Result<Vec<_>>>()?;
938        let key_value_metadata = value
939            .key_value_metadata
940            .iter()
941            .filter_map(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone())))
942            .collect::<HashMap<String, String>>();
943        Ok(protobuf::TableParquetOptions {
944            global: Some((&value.global).try_into()?),
945            column_specific_options,
946            key_value_metadata,
947        })
948    }
949}
950
951impl TryFrom<&CsvOptions> for protobuf::CsvOptions {
952    type Error = DataFusionError; // Define or use an appropriate error type
953
954    fn try_from(opts: &CsvOptions) -> datafusion_common::Result<Self, Self::Error> {
955        let compression: protobuf::CompressionTypeVariant = opts.compression.into();
956        Ok(protobuf::CsvOptions {
957            has_header: opts.has_header.map_or_else(Vec::new, |h| vec![h as u8]),
958            delimiter: vec![opts.delimiter],
959            quote: vec![opts.quote],
960            terminator: opts.terminator.map_or_else(Vec::new, |e| vec![e]),
961            escape: opts.escape.map_or_else(Vec::new, |e| vec![e]),
962            double_quote: opts.double_quote.map_or_else(Vec::new, |h| vec![h as u8]),
963            newlines_in_values: opts
964                .newlines_in_values
965                .map_or_else(Vec::new, |h| vec![h as u8]),
966            compression: compression.into(),
967            schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
968            date_format: opts.date_format.clone().unwrap_or_default(),
969            datetime_format: opts.datetime_format.clone().unwrap_or_default(),
970            timestamp_format: opts.timestamp_format.clone().unwrap_or_default(),
971            timestamp_tz_format: opts.timestamp_tz_format.clone().unwrap_or_default(),
972            time_format: opts.time_format.clone().unwrap_or_default(),
973            null_value: opts.null_value.clone().unwrap_or_default(),
974            null_regex: opts.null_regex.clone().unwrap_or_default(),
975            comment: opts.comment.map_or_else(Vec::new, |h| vec![h]),
976            truncated_rows: opts.truncated_rows.map_or_else(Vec::new, |h| vec![h as u8]),
977        })
978    }
979}
980
981impl TryFrom<&JsonOptions> for protobuf::JsonOptions {
982    type Error = DataFusionError;
983
984    fn try_from(opts: &JsonOptions) -> datafusion_common::Result<Self, Self::Error> {
985        let compression: protobuf::CompressionTypeVariant = opts.compression.into();
986        Ok(protobuf::JsonOptions {
987            compression: compression.into(),
988            schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
989        })
990    }
991}
992
993/// Creates a scalar protobuf value from an optional value (T), and
994/// encoding None as the appropriate datatype
995fn create_proto_scalar<I, T: FnOnce(&I) -> protobuf::scalar_value::Value>(
996    v: Option<&I>,
997    null_arrow_type: &DataType,
998    constructor: T,
999) -> Result<protobuf::ScalarValue, Error> {
1000    let value = v
1001        .map(constructor)
1002        .unwrap_or(protobuf::scalar_value::Value::NullValue(
1003            null_arrow_type.try_into()?,
1004        ));
1005
1006    Ok(protobuf::ScalarValue { value: Some(value) })
1007}
1008
1009// ScalarValue::List / FixedSizeList / LargeList / Struct / Map are serialized using
1010// Arrow IPC messages as a single column RecordBatch
1011fn encode_scalar_nested_value(
1012    arr: ArrayRef,
1013    val: &ScalarValue,
1014) -> Result<protobuf::ScalarValue, Error> {
1015    let batch = RecordBatch::try_from_iter(vec![("field_name", arr)]).map_err(|e| {
1016        Error::General(format!(
1017            "Error creating temporary batch while encoding ScalarValue::List: {e}"
1018        ))
1019    })?;
1020
1021    let gen = IpcDataGenerator {};
1022    let mut dict_tracker = DictionaryTracker::new(false);
1023    let write_options = IpcWriteOptions::default();
1024    let mut compression_context = CompressionContext::default();
1025    let (encoded_dictionaries, encoded_message) = gen
1026        .encode(
1027            &batch,
1028            &mut dict_tracker,
1029            &write_options,
1030            &mut compression_context,
1031        )
1032        .map_err(|e| {
1033            Error::General(format!("Error encoding ScalarValue::List as IPC: {e}"))
1034        })?;
1035
1036    let schema: protobuf::Schema = batch.schema().try_into()?;
1037
1038    let scalar_list_value = protobuf::ScalarNestedValue {
1039        ipc_message: encoded_message.ipc_message,
1040        arrow_data: encoded_message.arrow_data,
1041        dictionaries: encoded_dictionaries
1042            .into_iter()
1043            .map(|data| protobuf::scalar_nested_value::Dictionary {
1044                ipc_message: data.ipc_message,
1045                arrow_data: data.arrow_data,
1046            })
1047            .collect(),
1048        schema: Some(schema),
1049    };
1050
1051    match val {
1052        ScalarValue::List(_) => Ok(protobuf::ScalarValue {
1053            value: Some(protobuf::scalar_value::Value::ListValue(scalar_list_value)),
1054        }),
1055        ScalarValue::LargeList(_) => Ok(protobuf::ScalarValue {
1056            value: Some(protobuf::scalar_value::Value::LargeListValue(
1057                scalar_list_value,
1058            )),
1059        }),
1060        ScalarValue::FixedSizeList(_) => Ok(protobuf::ScalarValue {
1061            value: Some(protobuf::scalar_value::Value::FixedSizeListValue(
1062                scalar_list_value,
1063            )),
1064        }),
1065        ScalarValue::Struct(_) => Ok(protobuf::ScalarValue {
1066            value: Some(protobuf::scalar_value::Value::StructValue(
1067                scalar_list_value,
1068            )),
1069        }),
1070        ScalarValue::Map(_) => Ok(protobuf::ScalarValue {
1071            value: Some(protobuf::scalar_value::Value::MapValue(scalar_list_value)),
1072        }),
1073        _ => unreachable!(),
1074    }
1075}
1076
1077/// Converts a vector of `Arc<arrow::Field>`s to `protobuf::Field`s
1078fn convert_arc_fields_to_proto_fields<'a, I>(
1079    fields: I,
1080) -> Result<Vec<protobuf::Field>, Error>
1081where
1082    I: IntoIterator<Item = &'a Arc<Field>>,
1083{
1084    fields
1085        .into_iter()
1086        .map(|field| field.as_ref().try_into())
1087        .collect::<Result<Vec<_>, Error>>()
1088}
1089
1090pub(crate) fn csv_writer_options_to_proto(
1091    csv_options: &WriterBuilder,
1092    compression: &CompressionTypeVariant,
1093) -> protobuf::CsvWriterOptions {
1094    let compression: protobuf::CompressionTypeVariant = compression.into();
1095    protobuf::CsvWriterOptions {
1096        compression: compression.into(),
1097        delimiter: (csv_options.delimiter() as char).to_string(),
1098        has_header: csv_options.header(),
1099        date_format: csv_options.date_format().unwrap_or("").to_owned(),
1100        datetime_format: csv_options.datetime_format().unwrap_or("").to_owned(),
1101        timestamp_format: csv_options.timestamp_format().unwrap_or("").to_owned(),
1102        time_format: csv_options.time_format().unwrap_or("").to_owned(),
1103        null_value: csv_options.null().to_owned(),
1104        quote: (csv_options.quote() as char).to_string(),
1105        escape: (csv_options.escape() as char).to_string(),
1106        double_quote: csv_options.double_quote(),
1107    }
1108}