datafusion_proto_common/to_proto/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::protobuf_common as protobuf;
22use crate::protobuf_common::{
23    EmptyMessage, arrow_type::ArrowTypeEnum, scalar_value::Value,
24};
25use arrow::array::{ArrayRef, RecordBatch};
26use arrow::csv::WriterBuilder;
27use arrow::datatypes::{
28    DataType, Field, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, Schema,
29    SchemaRef, TimeUnit, UnionMode,
30};
31use arrow::ipc::writer::{
32    CompressionContext, DictionaryTracker, IpcDataGenerator, IpcWriteOptions,
33};
34use datafusion_common::{
35    Column, ColumnStatistics, Constraint, Constraints, DFSchema, DFSchemaRef,
36    DataFusionError, JoinSide, ScalarValue, Statistics,
37    config::{
38        CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
39        TableParquetOptions,
40    },
41    file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
42    parsers::CompressionTypeVariant,
43    plan_datafusion_err,
44    stats::Precision,
45};
46
47#[derive(Debug)]
48pub enum Error {
49    General(String),
50
51    InvalidScalarValue(ScalarValue),
52
53    InvalidScalarType(DataType),
54
55    InvalidTimeUnit(TimeUnit),
56
57    NotImplemented(String),
58}
59
60impl std::error::Error for Error {}
61
62impl std::fmt::Display for Error {
63    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
64        match self {
65            Self::General(desc) => write!(f, "General error: {desc}"),
66            Self::InvalidScalarValue(value) => {
67                write!(f, "{value:?} is invalid as a DataFusion scalar value")
68            }
69            Self::InvalidScalarType(data_type) => {
70                write!(f, "{data_type} is invalid as a DataFusion scalar type")
71            }
72            Self::InvalidTimeUnit(time_unit) => {
73                write!(
74                    f,
75                    "Only TimeUnit::Microsecond and TimeUnit::Nanosecond are valid time units, found: {time_unit:?}"
76                )
77            }
78            Self::NotImplemented(s) => {
79                write!(f, "Not implemented: {s}")
80            }
81        }
82    }
83}
84
85impl From<Error> for DataFusionError {
86    fn from(e: Error) -> Self {
87        plan_datafusion_err!("{}", e)
88    }
89}
90
91impl TryFrom<&Field> for protobuf::Field {
92    type Error = Error;
93
94    fn try_from(field: &Field) -> Result<Self, Self::Error> {
95        let arrow_type = field.data_type().try_into()?;
96        Ok(Self {
97            name: field.name().to_owned(),
98            arrow_type: Some(Box::new(arrow_type)),
99            nullable: field.is_nullable(),
100            children: Vec::new(),
101            metadata: field.metadata().clone(),
102        })
103    }
104}
105
106impl TryFrom<&DataType> for protobuf::ArrowType {
107    type Error = Error;
108
109    fn try_from(val: &DataType) -> Result<Self, Self::Error> {
110        let arrow_type_enum: ArrowTypeEnum = val.try_into()?;
111        Ok(Self {
112            arrow_type_enum: Some(arrow_type_enum),
113        })
114    }
115}
116
117impl TryFrom<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
118    type Error = Error;
119
120    fn try_from(val: &DataType) -> Result<Self, Self::Error> {
121        let res = match val {
122            DataType::Null => Self::None(EmptyMessage {}),
123            DataType::Boolean => Self::Bool(EmptyMessage {}),
124            DataType::Int8 => Self::Int8(EmptyMessage {}),
125            DataType::Int16 => Self::Int16(EmptyMessage {}),
126            DataType::Int32 => Self::Int32(EmptyMessage {}),
127            DataType::Int64 => Self::Int64(EmptyMessage {}),
128            DataType::UInt8 => Self::Uint8(EmptyMessage {}),
129            DataType::UInt16 => Self::Uint16(EmptyMessage {}),
130            DataType::UInt32 => Self::Uint32(EmptyMessage {}),
131            DataType::UInt64 => Self::Uint64(EmptyMessage {}),
132            DataType::Float16 => Self::Float16(EmptyMessage {}),
133            DataType::Float32 => Self::Float32(EmptyMessage {}),
134            DataType::Float64 => Self::Float64(EmptyMessage {}),
135            DataType::Timestamp(time_unit, timezone) => {
136                Self::Timestamp(protobuf::Timestamp {
137                    time_unit: protobuf::TimeUnit::from(time_unit) as i32,
138                    timezone: timezone.as_deref().unwrap_or("").to_string(),
139                })
140            }
141            DataType::Date32 => Self::Date32(EmptyMessage {}),
142            DataType::Date64 => Self::Date64(EmptyMessage {}),
143            DataType::Time32(time_unit) => {
144                Self::Time32(protobuf::TimeUnit::from(time_unit) as i32)
145            }
146            DataType::Time64(time_unit) => {
147                Self::Time64(protobuf::TimeUnit::from(time_unit) as i32)
148            }
149            DataType::Duration(time_unit) => {
150                Self::Duration(protobuf::TimeUnit::from(time_unit) as i32)
151            }
152            DataType::Interval(interval_unit) => {
153                Self::Interval(protobuf::IntervalUnit::from(interval_unit) as i32)
154            }
155            DataType::Binary => Self::Binary(EmptyMessage {}),
156            DataType::BinaryView => Self::BinaryView(EmptyMessage {}),
157            DataType::FixedSizeBinary(size) => Self::FixedSizeBinary(*size),
158            DataType::LargeBinary => Self::LargeBinary(EmptyMessage {}),
159            DataType::Utf8 => Self::Utf8(EmptyMessage {}),
160            DataType::Utf8View => Self::Utf8View(EmptyMessage {}),
161            DataType::LargeUtf8 => Self::LargeUtf8(EmptyMessage {}),
162            DataType::List(item_type) => Self::List(Box::new(protobuf::List {
163                field_type: Some(Box::new(item_type.as_ref().try_into()?)),
164            })),
165            DataType::FixedSizeList(item_type, size) => {
166                Self::FixedSizeList(Box::new(protobuf::FixedSizeList {
167                    field_type: Some(Box::new(item_type.as_ref().try_into()?)),
168                    list_size: *size,
169                }))
170            }
171            DataType::LargeList(item_type) => Self::LargeList(Box::new(protobuf::List {
172                field_type: Some(Box::new(item_type.as_ref().try_into()?)),
173            })),
174            DataType::Struct(struct_fields) => Self::Struct(protobuf::Struct {
175                sub_field_types: convert_arc_fields_to_proto_fields(struct_fields)?,
176            }),
177            DataType::Union(fields, union_mode) => {
178                let union_mode = match union_mode {
179                    UnionMode::Sparse => protobuf::UnionMode::Sparse,
180                    UnionMode::Dense => protobuf::UnionMode::Dense,
181                };
182                Self::Union(protobuf::Union {
183                    union_types: convert_arc_fields_to_proto_fields(fields.iter().map(|(_, item)|item))?,
184                    union_mode: union_mode.into(),
185                    type_ids: fields.iter().map(|(x, _)| x as i32).collect(),
186                })
187            }
188            DataType::Dictionary(key_type, value_type) => {
189                Self::Dictionary(Box::new(protobuf::Dictionary {
190                    key: Some(Box::new(key_type.as_ref().try_into()?)),
191                    value: Some(Box::new(value_type.as_ref().try_into()?)),
192                }))
193            }
194            DataType::Decimal32(precision, scale) => Self::Decimal32(protobuf::Decimal32Type {
195                precision: *precision as u32,
196                scale: *scale as i32,
197            }),
198            DataType::Decimal64(precision, scale) => Self::Decimal64(protobuf::Decimal64Type {
199                precision: *precision as u32,
200                scale: *scale as i32,
201            }),
202            DataType::Decimal128(precision, scale) => Self::Decimal128(protobuf::Decimal128Type {
203                precision: *precision as u32,
204                scale: *scale as i32,
205            }),
206            DataType::Decimal256(precision, scale) => Self::Decimal256(protobuf::Decimal256Type {
207                precision: *precision as u32,
208                scale: *scale as i32,
209            }),
210            DataType::Map(field, sorted) => {
211                Self::Map(Box::new(
212                    protobuf::Map {
213                        field_type: Some(Box::new(field.as_ref().try_into()?)),
214                        keys_sorted: *sorted,
215                    }
216                ))
217            }
218            DataType::RunEndEncoded(_, _) => {
219                return Err(Error::General(
220                    "Proto serialization error: The RunEndEncoded data type is not yet supported".to_owned()
221                ))
222            }
223            DataType::ListView(_) | DataType::LargeListView(_) => {
224                return Err(Error::General(format!("Proto serialization error: {val} not yet supported")))
225            }
226        };
227
228        Ok(res)
229    }
230}
231
232impl From<Column> for protobuf::Column {
233    fn from(c: Column) -> Self {
234        Self {
235            relation: c.relation.map(|relation| protobuf::ColumnRelation {
236                relation: relation.to_string(),
237            }),
238            name: c.name,
239        }
240    }
241}
242
243impl From<&Column> for protobuf::Column {
244    fn from(c: &Column) -> Self {
245        c.clone().into()
246    }
247}
248
249impl TryFrom<&Schema> for protobuf::Schema {
250    type Error = Error;
251
252    fn try_from(schema: &Schema) -> Result<Self, Self::Error> {
253        Ok(Self {
254            columns: convert_arc_fields_to_proto_fields(schema.fields())?,
255            metadata: schema.metadata.clone(),
256        })
257    }
258}
259
260impl TryFrom<SchemaRef> for protobuf::Schema {
261    type Error = Error;
262
263    fn try_from(schema: SchemaRef) -> Result<Self, Self::Error> {
264        Ok(Self {
265            columns: convert_arc_fields_to_proto_fields(schema.fields())?,
266            metadata: schema.metadata.clone(),
267        })
268    }
269}
270
271impl TryFrom<&DFSchema> for protobuf::DfSchema {
272    type Error = Error;
273
274    fn try_from(s: &DFSchema) -> Result<Self, Self::Error> {
275        let columns = s
276            .iter()
277            .map(|(qualifier, field)| {
278                Ok(protobuf::DfField {
279                    field: Some(field.as_ref().try_into()?),
280                    qualifier: qualifier.map(|r| protobuf::ColumnRelation {
281                        relation: r.to_string(),
282                    }),
283                })
284            })
285            .collect::<Result<Vec<_>, Error>>()?;
286        Ok(Self {
287            columns,
288            metadata: s.metadata().clone(),
289        })
290    }
291}
292
293impl TryFrom<&DFSchemaRef> for protobuf::DfSchema {
294    type Error = Error;
295
296    fn try_from(s: &DFSchemaRef) -> Result<Self, Self::Error> {
297        s.as_ref().try_into()
298    }
299}
300
301impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
302    type Error = Error;
303
304    fn try_from(val: &ScalarValue) -> Result<Self, Self::Error> {
305        let data_type = val.data_type();
306        match val {
307            ScalarValue::Boolean(val) => {
308                create_proto_scalar(val.as_ref(), &data_type, |s| Value::BoolValue(*s))
309            }
310            ScalarValue::Float16(val) => {
311                create_proto_scalar(val.as_ref(), &data_type, |s| {
312                    Value::Float32Value((*s).into())
313                })
314            }
315            ScalarValue::Float32(val) => {
316                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Float32Value(*s))
317            }
318            ScalarValue::Float64(val) => {
319                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Float64Value(*s))
320            }
321            ScalarValue::Int8(val) => {
322                create_proto_scalar(val.as_ref(), &data_type, |s| {
323                    Value::Int8Value(*s as i32)
324                })
325            }
326            ScalarValue::Int16(val) => {
327                create_proto_scalar(val.as_ref(), &data_type, |s| {
328                    Value::Int16Value(*s as i32)
329                })
330            }
331            ScalarValue::Int32(val) => {
332                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Int32Value(*s))
333            }
334            ScalarValue::Int64(val) => {
335                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Int64Value(*s))
336            }
337            ScalarValue::UInt8(val) => {
338                create_proto_scalar(val.as_ref(), &data_type, |s| {
339                    Value::Uint8Value(*s as u32)
340                })
341            }
342            ScalarValue::UInt16(val) => {
343                create_proto_scalar(val.as_ref(), &data_type, |s| {
344                    Value::Uint16Value(*s as u32)
345                })
346            }
347            ScalarValue::UInt32(val) => {
348                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Uint32Value(*s))
349            }
350            ScalarValue::UInt64(val) => {
351                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Uint64Value(*s))
352            }
353            ScalarValue::Utf8(val) => {
354                create_proto_scalar(val.as_ref(), &data_type, |s| {
355                    Value::Utf8Value(s.to_owned())
356                })
357            }
358            ScalarValue::LargeUtf8(val) => {
359                create_proto_scalar(val.as_ref(), &data_type, |s| {
360                    Value::LargeUtf8Value(s.to_owned())
361                })
362            }
363            ScalarValue::Utf8View(val) => {
364                create_proto_scalar(val.as_ref(), &data_type, |s| {
365                    Value::Utf8ViewValue(s.to_owned())
366                })
367            }
368            ScalarValue::List(arr) => {
369                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
370            }
371            ScalarValue::LargeList(arr) => {
372                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
373            }
374            ScalarValue::FixedSizeList(arr) => {
375                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
376            }
377            ScalarValue::Struct(arr) => {
378                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
379            }
380            ScalarValue::Map(arr) => {
381                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
382            }
383            ScalarValue::Date32(val) => {
384                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Date32Value(*s))
385            }
386            ScalarValue::TimestampMicrosecond(val, tz) => {
387                create_proto_scalar(val.as_ref(), &data_type, |s| {
388                    Value::TimestampValue(protobuf::ScalarTimestampValue {
389                        timezone: tz.as_deref().unwrap_or("").to_string(),
390                        value: Some(
391                            protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(
392                                *s,
393                            ),
394                        ),
395                    })
396                })
397            }
398            ScalarValue::TimestampNanosecond(val, tz) => {
399                create_proto_scalar(val.as_ref(), &data_type, |s| {
400                    Value::TimestampValue(protobuf::ScalarTimestampValue {
401                        timezone: tz.as_deref().unwrap_or("").to_string(),
402                        value: Some(
403                            protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(
404                                *s,
405                            ),
406                        ),
407                    })
408                })
409            }
410            ScalarValue::Decimal32(val, p, s) => match *val {
411                Some(v) => {
412                    let array = v.to_be_bytes();
413                    let vec_val: Vec<u8> = array.to_vec();
414                    Ok(protobuf::ScalarValue {
415                        value: Some(Value::Decimal32Value(protobuf::Decimal32 {
416                            value: vec_val,
417                            p: *p as i64,
418                            s: *s as i64,
419                        })),
420                    })
421                }
422                None => Ok(protobuf::ScalarValue {
423                    value: Some(protobuf::scalar_value::Value::NullValue(
424                        (&data_type).try_into()?,
425                    )),
426                }),
427            },
428            ScalarValue::Decimal64(val, p, s) => match *val {
429                Some(v) => {
430                    let array = v.to_be_bytes();
431                    let vec_val: Vec<u8> = array.to_vec();
432                    Ok(protobuf::ScalarValue {
433                        value: Some(Value::Decimal64Value(protobuf::Decimal64 {
434                            value: vec_val,
435                            p: *p as i64,
436                            s: *s as i64,
437                        })),
438                    })
439                }
440                None => Ok(protobuf::ScalarValue {
441                    value: Some(protobuf::scalar_value::Value::NullValue(
442                        (&data_type).try_into()?,
443                    )),
444                }),
445            },
446            ScalarValue::Decimal128(val, p, s) => match *val {
447                Some(v) => {
448                    let array = v.to_be_bytes();
449                    let vec_val: Vec<u8> = array.to_vec();
450                    Ok(protobuf::ScalarValue {
451                        value: Some(Value::Decimal128Value(protobuf::Decimal128 {
452                            value: vec_val,
453                            p: *p as i64,
454                            s: *s as i64,
455                        })),
456                    })
457                }
458                None => Ok(protobuf::ScalarValue {
459                    value: Some(protobuf::scalar_value::Value::NullValue(
460                        (&data_type).try_into()?,
461                    )),
462                }),
463            },
464            ScalarValue::Decimal256(val, p, s) => match *val {
465                Some(v) => {
466                    let array = v.to_be_bytes();
467                    let vec_val: Vec<u8> = array.to_vec();
468                    Ok(protobuf::ScalarValue {
469                        value: Some(Value::Decimal256Value(protobuf::Decimal256 {
470                            value: vec_val,
471                            p: *p as i64,
472                            s: *s as i64,
473                        })),
474                    })
475                }
476                None => Ok(protobuf::ScalarValue {
477                    value: Some(protobuf::scalar_value::Value::NullValue(
478                        (&data_type).try_into()?,
479                    )),
480                }),
481            },
482            ScalarValue::Date64(val) => {
483                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Date64Value(*s))
484            }
485            ScalarValue::TimestampSecond(val, tz) => {
486                create_proto_scalar(val.as_ref(), &data_type, |s| {
487                    Value::TimestampValue(protobuf::ScalarTimestampValue {
488                        timezone: tz.as_deref().unwrap_or("").to_string(),
489                        value: Some(
490                            protobuf::scalar_timestamp_value::Value::TimeSecondValue(*s),
491                        ),
492                    })
493                })
494            }
495            ScalarValue::TimestampMillisecond(val, tz) => {
496                create_proto_scalar(val.as_ref(), &data_type, |s| {
497                    Value::TimestampValue(protobuf::ScalarTimestampValue {
498                        timezone: tz.as_deref().unwrap_or("").to_string(),
499                        value: Some(
500                            protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(
501                                *s,
502                            ),
503                        ),
504                    })
505                })
506            }
507            ScalarValue::IntervalYearMonth(val) => {
508                create_proto_scalar(val.as_ref(), &data_type, |s| {
509                    Value::IntervalYearmonthValue(*s)
510                })
511            }
512            ScalarValue::Null => Ok(protobuf::ScalarValue {
513                value: Some(Value::NullValue((&data_type).try_into()?)),
514            }),
515
516            ScalarValue::Binary(val) => {
517                create_proto_scalar(val.as_ref(), &data_type, |s| {
518                    Value::BinaryValue(s.to_owned())
519                })
520            }
521            ScalarValue::BinaryView(val) => {
522                create_proto_scalar(val.as_ref(), &data_type, |s| {
523                    Value::BinaryViewValue(s.to_owned())
524                })
525            }
526            ScalarValue::LargeBinary(val) => {
527                create_proto_scalar(val.as_ref(), &data_type, |s| {
528                    Value::LargeBinaryValue(s.to_owned())
529                })
530            }
531            ScalarValue::FixedSizeBinary(length, val) => {
532                create_proto_scalar(val.as_ref(), &data_type, |s| {
533                    Value::FixedSizeBinaryValue(protobuf::ScalarFixedSizeBinary {
534                        values: s.to_owned(),
535                        length: *length,
536                    })
537                })
538            }
539
540            ScalarValue::Time32Second(v) => {
541                create_proto_scalar(v.as_ref(), &data_type, |v| {
542                    Value::Time32Value(protobuf::ScalarTime32Value {
543                        value: Some(
544                            protobuf::scalar_time32_value::Value::Time32SecondValue(*v),
545                        ),
546                    })
547                })
548            }
549
550            ScalarValue::Time32Millisecond(v) => {
551                create_proto_scalar(v.as_ref(), &data_type, |v| {
552                    Value::Time32Value(protobuf::ScalarTime32Value {
553                        value: Some(
554                            protobuf::scalar_time32_value::Value::Time32MillisecondValue(
555                                *v,
556                            ),
557                        ),
558                    })
559                })
560            }
561
562            ScalarValue::Time64Microsecond(v) => {
563                create_proto_scalar(v.as_ref(), &data_type, |v| {
564                    Value::Time64Value(protobuf::ScalarTime64Value {
565                        value: Some(
566                            protobuf::scalar_time64_value::Value::Time64MicrosecondValue(
567                                *v,
568                            ),
569                        ),
570                    })
571                })
572            }
573
574            ScalarValue::Time64Nanosecond(v) => {
575                create_proto_scalar(v.as_ref(), &data_type, |v| {
576                    Value::Time64Value(protobuf::ScalarTime64Value {
577                        value: Some(
578                            protobuf::scalar_time64_value::Value::Time64NanosecondValue(
579                                *v,
580                            ),
581                        ),
582                    })
583                })
584            }
585
586            ScalarValue::IntervalDayTime(val) => {
587                let value = if let Some(v) = val {
588                    let (days, milliseconds) = IntervalDayTimeType::to_parts(*v);
589                    Value::IntervalDaytimeValue(protobuf::IntervalDayTimeValue {
590                        days,
591                        milliseconds,
592                    })
593                } else {
594                    Value::NullValue((&data_type).try_into()?)
595                };
596
597                Ok(protobuf::ScalarValue { value: Some(value) })
598            }
599
600            ScalarValue::IntervalMonthDayNano(v) => {
601                let value = if let Some(v) = v {
602                    let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v);
603                    Value::IntervalMonthDayNano(protobuf::IntervalMonthDayNanoValue {
604                        months,
605                        days,
606                        nanos,
607                    })
608                } else {
609                    Value::NullValue((&data_type).try_into()?)
610                };
611
612                Ok(protobuf::ScalarValue { value: Some(value) })
613            }
614
615            ScalarValue::DurationSecond(v) => {
616                let value = match v {
617                    Some(v) => Value::DurationSecondValue(*v),
618                    None => Value::NullValue((&data_type).try_into()?),
619                };
620                Ok(protobuf::ScalarValue { value: Some(value) })
621            }
622            ScalarValue::DurationMillisecond(v) => {
623                let value = match v {
624                    Some(v) => Value::DurationMillisecondValue(*v),
625                    None => Value::NullValue((&data_type).try_into()?),
626                };
627                Ok(protobuf::ScalarValue { value: Some(value) })
628            }
629            ScalarValue::DurationMicrosecond(v) => {
630                let value = match v {
631                    Some(v) => Value::DurationMicrosecondValue(*v),
632                    None => Value::NullValue((&data_type).try_into()?),
633                };
634                Ok(protobuf::ScalarValue { value: Some(value) })
635            }
636            ScalarValue::DurationNanosecond(v) => {
637                let value = match v {
638                    Some(v) => Value::DurationNanosecondValue(*v),
639                    None => Value::NullValue((&data_type).try_into()?),
640                };
641                Ok(protobuf::ScalarValue { value: Some(value) })
642            }
643
644            ScalarValue::Union(val, df_fields, mode) => {
645                let mut fields =
646                    Vec::<protobuf::UnionField>::with_capacity(df_fields.len());
647                for (id, field) in df_fields.iter() {
648                    let field_id = id as i32;
649                    let field = Some(field.as_ref().try_into()?);
650                    let field = protobuf::UnionField { field_id, field };
651                    fields.push(field);
652                }
653                let mode = match mode {
654                    UnionMode::Sparse => 0,
655                    UnionMode::Dense => 1,
656                };
657                let value = match val {
658                    None => None,
659                    Some((_id, v)) => Some(Box::new(v.as_ref().try_into()?)),
660                };
661                let val = protobuf::UnionValue {
662                    value_id: val.as_ref().map(|(id, _v)| *id as i32).unwrap_or(0),
663                    value,
664                    fields,
665                    mode,
666                };
667                let val = Value::UnionValue(Box::new(val));
668                let val = protobuf::ScalarValue { value: Some(val) };
669                Ok(val)
670            }
671
672            ScalarValue::Dictionary(index_type, val) => {
673                let value: protobuf::ScalarValue = val.as_ref().try_into()?;
674                Ok(protobuf::ScalarValue {
675                    value: Some(Value::DictionaryValue(Box::new(
676                        protobuf::ScalarDictionaryValue {
677                            index_type: Some(index_type.as_ref().try_into()?),
678                            value: Some(Box::new(value)),
679                        },
680                    ))),
681                })
682            }
683        }
684    }
685}
686
687impl From<&TimeUnit> for protobuf::TimeUnit {
688    fn from(val: &TimeUnit) -> Self {
689        match val {
690            TimeUnit::Second => protobuf::TimeUnit::Second,
691            TimeUnit::Millisecond => protobuf::TimeUnit::Millisecond,
692            TimeUnit::Microsecond => protobuf::TimeUnit::Microsecond,
693            TimeUnit::Nanosecond => protobuf::TimeUnit::Nanosecond,
694        }
695    }
696}
697
698impl From<&IntervalUnit> for protobuf::IntervalUnit {
699    fn from(interval_unit: &IntervalUnit) -> Self {
700        match interval_unit {
701            IntervalUnit::YearMonth => protobuf::IntervalUnit::YearMonth,
702            IntervalUnit::DayTime => protobuf::IntervalUnit::DayTime,
703            IntervalUnit::MonthDayNano => protobuf::IntervalUnit::MonthDayNano,
704        }
705    }
706}
707
708impl From<Constraints> for protobuf::Constraints {
709    fn from(value: Constraints) -> Self {
710        let constraints = value.into_iter().map(|item| item.into()).collect();
711        protobuf::Constraints { constraints }
712    }
713}
714
715impl From<Constraint> for protobuf::Constraint {
716    fn from(value: Constraint) -> Self {
717        let res = match value {
718            Constraint::PrimaryKey(indices) => {
719                let indices = indices.into_iter().map(|item| item as u64).collect();
720                protobuf::constraint::ConstraintMode::PrimaryKey(
721                    protobuf::PrimaryKeyConstraint { indices },
722                )
723            }
724            Constraint::Unique(indices) => {
725                let indices = indices.into_iter().map(|item| item as u64).collect();
726                protobuf::constraint::ConstraintMode::PrimaryKey(
727                    protobuf::PrimaryKeyConstraint { indices },
728                )
729            }
730        };
731        protobuf::Constraint {
732            constraint_mode: Some(res),
733        }
734    }
735}
736
737impl From<&Precision<usize>> for protobuf::Precision {
738    fn from(s: &Precision<usize>) -> protobuf::Precision {
739        match s {
740            Precision::Exact(val) => protobuf::Precision {
741                precision_info: protobuf::PrecisionInfo::Exact.into(),
742                val: Some(crate::protobuf_common::ScalarValue {
743                    value: Some(Value::Uint64Value(*val as u64)),
744                }),
745            },
746            Precision::Inexact(val) => protobuf::Precision {
747                precision_info: protobuf::PrecisionInfo::Inexact.into(),
748                val: Some(crate::protobuf_common::ScalarValue {
749                    value: Some(Value::Uint64Value(*val as u64)),
750                }),
751            },
752            Precision::Absent => protobuf::Precision {
753                precision_info: protobuf::PrecisionInfo::Absent.into(),
754                val: Some(crate::protobuf_common::ScalarValue { value: None }),
755            },
756        }
757    }
758}
759
760impl From<&Precision<datafusion_common::ScalarValue>> for protobuf::Precision {
761    fn from(s: &Precision<datafusion_common::ScalarValue>) -> protobuf::Precision {
762        match s {
763            Precision::Exact(val) => protobuf::Precision {
764                precision_info: protobuf::PrecisionInfo::Exact.into(),
765                val: val.try_into().ok(),
766            },
767            Precision::Inexact(val) => protobuf::Precision {
768                precision_info: protobuf::PrecisionInfo::Inexact.into(),
769                val: val.try_into().ok(),
770            },
771            Precision::Absent => protobuf::Precision {
772                precision_info: protobuf::PrecisionInfo::Absent.into(),
773                val: Some(crate::protobuf_common::ScalarValue { value: None }),
774            },
775        }
776    }
777}
778
779impl From<&Statistics> for protobuf::Statistics {
780    fn from(s: &Statistics) -> protobuf::Statistics {
781        let column_stats = s.column_statistics.iter().map(|s| s.into()).collect();
782        protobuf::Statistics {
783            num_rows: Some(protobuf::Precision::from(&s.num_rows)),
784            total_byte_size: Some(protobuf::Precision::from(&s.total_byte_size)),
785            column_stats,
786        }
787    }
788}
789
790impl From<&ColumnStatistics> for protobuf::ColumnStats {
791    fn from(s: &ColumnStatistics) -> protobuf::ColumnStats {
792        protobuf::ColumnStats {
793            min_value: Some(protobuf::Precision::from(&s.min_value)),
794            max_value: Some(protobuf::Precision::from(&s.max_value)),
795            sum_value: Some(protobuf::Precision::from(&s.sum_value)),
796            null_count: Some(protobuf::Precision::from(&s.null_count)),
797            distinct_count: Some(protobuf::Precision::from(&s.distinct_count)),
798            byte_size: Some(protobuf::Precision::from(&s.byte_size)),
799        }
800    }
801}
802
803impl From<JoinSide> for protobuf::JoinSide {
804    fn from(t: JoinSide) -> Self {
805        match t {
806            JoinSide::Left => protobuf::JoinSide::LeftSide,
807            JoinSide::Right => protobuf::JoinSide::RightSide,
808            JoinSide::None => protobuf::JoinSide::None,
809        }
810    }
811}
812
813impl From<&CompressionTypeVariant> for protobuf::CompressionTypeVariant {
814    fn from(value: &CompressionTypeVariant) -> Self {
815        match value {
816            CompressionTypeVariant::GZIP => Self::Gzip,
817            CompressionTypeVariant::BZIP2 => Self::Bzip2,
818            CompressionTypeVariant::XZ => Self::Xz,
819            CompressionTypeVariant::ZSTD => Self::Zstd,
820            CompressionTypeVariant::UNCOMPRESSED => Self::Uncompressed,
821        }
822    }
823}
824
825impl TryFrom<&CsvWriterOptions> for protobuf::CsvWriterOptions {
826    type Error = DataFusionError;
827
828    fn try_from(opts: &CsvWriterOptions) -> datafusion_common::Result<Self, Self::Error> {
829        Ok(csv_writer_options_to_proto(
830            &opts.writer_options,
831            &opts.compression,
832        ))
833    }
834}
835
836impl TryFrom<&JsonWriterOptions> for protobuf::JsonWriterOptions {
837    type Error = DataFusionError;
838
839    fn try_from(
840        opts: &JsonWriterOptions,
841    ) -> datafusion_common::Result<Self, Self::Error> {
842        let compression: protobuf::CompressionTypeVariant = opts.compression.into();
843        Ok(protobuf::JsonWriterOptions {
844            compression: compression.into(),
845        })
846    }
847}
848
849impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
850    type Error = DataFusionError;
851
852    fn try_from(value: &ParquetOptions) -> datafusion_common::Result<Self, Self::Error> {
853        Ok(protobuf::ParquetOptions {
854            enable_page_index: value.enable_page_index,
855            pruning: value.pruning,
856            skip_metadata: value.skip_metadata,
857            metadata_size_hint_opt: value.metadata_size_hint.map(|v| protobuf::parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v as u64)),
858            pushdown_filters: value.pushdown_filters,
859            reorder_filters: value.reorder_filters,
860            force_filter_selections: value.force_filter_selections,
861            data_pagesize_limit: value.data_pagesize_limit as u64,
862            write_batch_size: value.write_batch_size as u64,
863            writer_version: value.writer_version.to_string(),
864            compression_opt: value.compression.clone().map(protobuf::parquet_options::CompressionOpt::Compression),
865            dictionary_enabled_opt: value.dictionary_enabled.map(protobuf::parquet_options::DictionaryEnabledOpt::DictionaryEnabled),
866            dictionary_page_size_limit: value.dictionary_page_size_limit as u64,
867            statistics_enabled_opt: value.statistics_enabled.clone().map(protobuf::parquet_options::StatisticsEnabledOpt::StatisticsEnabled),
868            max_row_group_size: value.max_row_group_size as u64,
869            created_by: value.created_by.clone(),
870            column_index_truncate_length_opt: value.column_index_truncate_length.map(|v| protobuf::parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v as u64)),
871            statistics_truncate_length_opt: value.statistics_truncate_length.map(|v| protobuf::parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v as u64)),
872            data_page_row_count_limit: value.data_page_row_count_limit as u64,
873            encoding_opt: value.encoding.clone().map(protobuf::parquet_options::EncodingOpt::Encoding),
874            bloom_filter_on_read: value.bloom_filter_on_read,
875            bloom_filter_on_write: value.bloom_filter_on_write,
876            bloom_filter_fpp_opt: value.bloom_filter_fpp.map(protobuf::parquet_options::BloomFilterFppOpt::BloomFilterFpp),
877            bloom_filter_ndv_opt: value.bloom_filter_ndv.map(protobuf::parquet_options::BloomFilterNdvOpt::BloomFilterNdv),
878            allow_single_file_parallelism: value.allow_single_file_parallelism,
879            maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as u64,
880            maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as u64,
881            schema_force_view_types: value.schema_force_view_types,
882            binary_as_string: value.binary_as_string,
883            skip_arrow_metadata: value.skip_arrow_metadata,
884            coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96),
885            max_predicate_cache_size_opt: value.max_predicate_cache_size.map(|v| protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as u64)),
886        })
887    }
888}
889
890impl TryFrom<&ParquetColumnOptions> for protobuf::ParquetColumnOptions {
891    type Error = DataFusionError;
892
893    fn try_from(
894        value: &ParquetColumnOptions,
895    ) -> datafusion_common::Result<Self, Self::Error> {
896        Ok(protobuf::ParquetColumnOptions {
897            compression_opt: value
898                .compression
899                .clone()
900                .map(protobuf::parquet_column_options::CompressionOpt::Compression),
901            dictionary_enabled_opt: value
902                .dictionary_enabled
903                .map(protobuf::parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled),
904            statistics_enabled_opt: value
905                .statistics_enabled
906                .clone()
907                .map(protobuf::parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled),
908            encoding_opt: value
909                .encoding
910                .clone()
911                .map(protobuf::parquet_column_options::EncodingOpt::Encoding),
912            bloom_filter_enabled_opt: value
913                .bloom_filter_enabled
914                .map(protobuf::parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled),
915            bloom_filter_fpp_opt: value
916                .bloom_filter_fpp
917                .map(protobuf::parquet_column_options::BloomFilterFppOpt::BloomFilterFpp),
918            bloom_filter_ndv_opt: value
919                .bloom_filter_ndv
920                .map(protobuf::parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv),
921        })
922    }
923}
924
925impl TryFrom<&TableParquetOptions> for protobuf::TableParquetOptions {
926    type Error = DataFusionError;
927    fn try_from(
928        value: &TableParquetOptions,
929    ) -> datafusion_common::Result<Self, Self::Error> {
930        let column_specific_options = value
931            .column_specific_options
932            .iter()
933            .map(|(k, v)| {
934                Ok(protobuf::ParquetColumnSpecificOptions {
935                    column_name: k.into(),
936                    options: Some(v.try_into()?),
937                })
938            })
939            .collect::<datafusion_common::Result<Vec<_>>>()?;
940        let key_value_metadata = value
941            .key_value_metadata
942            .iter()
943            .filter_map(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone())))
944            .collect::<HashMap<String, String>>();
945        Ok(protobuf::TableParquetOptions {
946            global: Some((&value.global).try_into()?),
947            column_specific_options,
948            key_value_metadata,
949        })
950    }
951}
952
953impl TryFrom<&CsvOptions> for protobuf::CsvOptions {
954    type Error = DataFusionError; // Define or use an appropriate error type
955
956    fn try_from(opts: &CsvOptions) -> datafusion_common::Result<Self, Self::Error> {
957        let compression: protobuf::CompressionTypeVariant = opts.compression.into();
958        Ok(protobuf::CsvOptions {
959            has_header: opts.has_header.map_or_else(Vec::new, |h| vec![h as u8]),
960            delimiter: vec![opts.delimiter],
961            quote: vec![opts.quote],
962            terminator: opts.terminator.map_or_else(Vec::new, |e| vec![e]),
963            escape: opts.escape.map_or_else(Vec::new, |e| vec![e]),
964            double_quote: opts.double_quote.map_or_else(Vec::new, |h| vec![h as u8]),
965            newlines_in_values: opts
966                .newlines_in_values
967                .map_or_else(Vec::new, |h| vec![h as u8]),
968            compression: compression.into(),
969            schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
970            date_format: opts.date_format.clone().unwrap_or_default(),
971            datetime_format: opts.datetime_format.clone().unwrap_or_default(),
972            timestamp_format: opts.timestamp_format.clone().unwrap_or_default(),
973            timestamp_tz_format: opts.timestamp_tz_format.clone().unwrap_or_default(),
974            time_format: opts.time_format.clone().unwrap_or_default(),
975            null_value: opts.null_value.clone().unwrap_or_default(),
976            null_regex: opts.null_regex.clone().unwrap_or_default(),
977            comment: opts.comment.map_or_else(Vec::new, |h| vec![h]),
978            truncated_rows: opts.truncated_rows.map_or_else(Vec::new, |h| vec![h as u8]),
979            compression_level: opts.compression_level,
980        })
981    }
982}
983
984impl TryFrom<&JsonOptions> for protobuf::JsonOptions {
985    type Error = DataFusionError;
986
987    fn try_from(opts: &JsonOptions) -> datafusion_common::Result<Self, Self::Error> {
988        let compression: protobuf::CompressionTypeVariant = opts.compression.into();
989        Ok(protobuf::JsonOptions {
990            compression: compression.into(),
991            schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
992            compression_level: opts.compression_level,
993        })
994    }
995}
996
997/// Creates a scalar protobuf value from an optional value (T), and
998/// encoding None as the appropriate datatype
999fn create_proto_scalar<I, T: FnOnce(&I) -> protobuf::scalar_value::Value>(
1000    v: Option<&I>,
1001    null_arrow_type: &DataType,
1002    constructor: T,
1003) -> Result<protobuf::ScalarValue, Error> {
1004    let value = v
1005        .map(constructor)
1006        .unwrap_or(protobuf::scalar_value::Value::NullValue(
1007            null_arrow_type.try_into()?,
1008        ));
1009
1010    Ok(protobuf::ScalarValue { value: Some(value) })
1011}
1012
1013// ScalarValue::List / FixedSizeList / LargeList / Struct / Map are serialized using
1014// Arrow IPC messages as a single column RecordBatch
1015fn encode_scalar_nested_value(
1016    arr: ArrayRef,
1017    val: &ScalarValue,
1018) -> Result<protobuf::ScalarValue, Error> {
1019    let batch = RecordBatch::try_from_iter(vec![("field_name", arr)]).map_err(|e| {
1020        Error::General(format!(
1021            "Error creating temporary batch while encoding ScalarValue::List: {e}"
1022        ))
1023    })?;
1024
1025    let ipc_gen = IpcDataGenerator {};
1026    let mut dict_tracker = DictionaryTracker::new(false);
1027    let write_options = IpcWriteOptions::default();
1028    let mut compression_context = CompressionContext::default();
1029    let (encoded_dictionaries, encoded_message) = ipc_gen
1030        .encode(
1031            &batch,
1032            &mut dict_tracker,
1033            &write_options,
1034            &mut compression_context,
1035        )
1036        .map_err(|e| {
1037            Error::General(format!("Error encoding ScalarValue::List as IPC: {e}"))
1038        })?;
1039
1040    let schema: protobuf::Schema = batch.schema().try_into()?;
1041
1042    let scalar_list_value = protobuf::ScalarNestedValue {
1043        ipc_message: encoded_message.ipc_message,
1044        arrow_data: encoded_message.arrow_data,
1045        dictionaries: encoded_dictionaries
1046            .into_iter()
1047            .map(|data| protobuf::scalar_nested_value::Dictionary {
1048                ipc_message: data.ipc_message,
1049                arrow_data: data.arrow_data,
1050            })
1051            .collect(),
1052        schema: Some(schema),
1053    };
1054
1055    match val {
1056        ScalarValue::List(_) => Ok(protobuf::ScalarValue {
1057            value: Some(protobuf::scalar_value::Value::ListValue(scalar_list_value)),
1058        }),
1059        ScalarValue::LargeList(_) => Ok(protobuf::ScalarValue {
1060            value: Some(protobuf::scalar_value::Value::LargeListValue(
1061                scalar_list_value,
1062            )),
1063        }),
1064        ScalarValue::FixedSizeList(_) => Ok(protobuf::ScalarValue {
1065            value: Some(protobuf::scalar_value::Value::FixedSizeListValue(
1066                scalar_list_value,
1067            )),
1068        }),
1069        ScalarValue::Struct(_) => Ok(protobuf::ScalarValue {
1070            value: Some(protobuf::scalar_value::Value::StructValue(
1071                scalar_list_value,
1072            )),
1073        }),
1074        ScalarValue::Map(_) => Ok(protobuf::ScalarValue {
1075            value: Some(protobuf::scalar_value::Value::MapValue(scalar_list_value)),
1076        }),
1077        _ => unreachable!(),
1078    }
1079}
1080
1081/// Converts a vector of `Arc<arrow::Field>`s to `protobuf::Field`s
1082fn convert_arc_fields_to_proto_fields<'a, I>(
1083    fields: I,
1084) -> Result<Vec<protobuf::Field>, Error>
1085where
1086    I: IntoIterator<Item = &'a Arc<Field>>,
1087{
1088    fields
1089        .into_iter()
1090        .map(|field| field.as_ref().try_into())
1091        .collect::<Result<Vec<_>, Error>>()
1092}
1093
1094pub(crate) fn csv_writer_options_to_proto(
1095    csv_options: &WriterBuilder,
1096    compression: &CompressionTypeVariant,
1097) -> protobuf::CsvWriterOptions {
1098    let compression: protobuf::CompressionTypeVariant = compression.into();
1099    protobuf::CsvWriterOptions {
1100        compression: compression.into(),
1101        delimiter: (csv_options.delimiter() as char).to_string(),
1102        has_header: csv_options.header(),
1103        date_format: csv_options.date_format().unwrap_or("").to_owned(),
1104        datetime_format: csv_options.datetime_format().unwrap_or("").to_owned(),
1105        timestamp_format: csv_options.timestamp_format().unwrap_or("").to_owned(),
1106        time_format: csv_options.time_format().unwrap_or("").to_owned(),
1107        null_value: csv_options.null().to_owned(),
1108        quote: (csv_options.quote() as char).to_string(),
1109        escape: (csv_options.escape() as char).to_string(),
1110        double_quote: csv_options.double_quote(),
1111    }
1112}