Skip to main content

datafusion_proto_common/to_proto/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::protobuf_common as protobuf;
22use crate::protobuf_common::{
23    EmptyMessage, arrow_type::ArrowTypeEnum, scalar_value::Value,
24};
25use arrow::array::{ArrayRef, RecordBatch};
26use arrow::csv::WriterBuilder;
27use arrow::datatypes::{
28    DataType, Field, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, Schema,
29    SchemaRef, TimeUnit, UnionMode,
30};
31use arrow::ipc::writer::{
32    CompressionContext, DictionaryTracker, IpcDataGenerator, IpcWriteOptions,
33};
34use datafusion_common::{
35    Column, ColumnStatistics, Constraint, Constraints, DFSchema, DFSchemaRef,
36    DataFusionError, JoinSide, ScalarValue, Statistics,
37    config::{
38        CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
39        TableParquetOptions,
40    },
41    file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
42    parsers::CompressionTypeVariant,
43    plan_datafusion_err,
44    stats::Precision,
45};
46
47#[derive(Debug)]
48pub enum Error {
49    General(String),
50
51    InvalidScalarValue(ScalarValue),
52
53    InvalidScalarType(DataType),
54
55    InvalidTimeUnit(TimeUnit),
56
57    NotImplemented(String),
58}
59
60impl std::error::Error for Error {}
61
62impl std::fmt::Display for Error {
63    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
64        match self {
65            Self::General(desc) => write!(f, "General error: {desc}"),
66            Self::InvalidScalarValue(value) => {
67                write!(f, "{value:?} is invalid as a DataFusion scalar value")
68            }
69            Self::InvalidScalarType(data_type) => {
70                write!(f, "{data_type} is invalid as a DataFusion scalar type")
71            }
72            Self::InvalidTimeUnit(time_unit) => {
73                write!(
74                    f,
75                    "Only TimeUnit::Microsecond and TimeUnit::Nanosecond are valid time units, found: {time_unit:?}"
76                )
77            }
78            Self::NotImplemented(s) => {
79                write!(f, "Not implemented: {s}")
80            }
81        }
82    }
83}
84
85impl From<Error> for DataFusionError {
86    fn from(e: Error) -> Self {
87        plan_datafusion_err!("{}", e)
88    }
89}
90
91impl TryFrom<&Field> for protobuf::Field {
92    type Error = Error;
93
94    fn try_from(field: &Field) -> Result<Self, Self::Error> {
95        let arrow_type = field.data_type().try_into()?;
96        Ok(Self {
97            name: field.name().to_owned(),
98            arrow_type: Some(Box::new(arrow_type)),
99            nullable: field.is_nullable(),
100            children: Vec::new(),
101            metadata: field.metadata().clone(),
102        })
103    }
104}
105
106impl TryFrom<&DataType> for protobuf::ArrowType {
107    type Error = Error;
108
109    fn try_from(val: &DataType) -> Result<Self, Self::Error> {
110        let arrow_type_enum: ArrowTypeEnum = val.try_into()?;
111        Ok(Self {
112            arrow_type_enum: Some(arrow_type_enum),
113        })
114    }
115}
116
117impl TryFrom<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
118    type Error = Error;
119
120    fn try_from(val: &DataType) -> Result<Self, Self::Error> {
121        let res = match val {
122            DataType::Null => Self::None(EmptyMessage {}),
123            DataType::Boolean => Self::Bool(EmptyMessage {}),
124            DataType::Int8 => Self::Int8(EmptyMessage {}),
125            DataType::Int16 => Self::Int16(EmptyMessage {}),
126            DataType::Int32 => Self::Int32(EmptyMessage {}),
127            DataType::Int64 => Self::Int64(EmptyMessage {}),
128            DataType::UInt8 => Self::Uint8(EmptyMessage {}),
129            DataType::UInt16 => Self::Uint16(EmptyMessage {}),
130            DataType::UInt32 => Self::Uint32(EmptyMessage {}),
131            DataType::UInt64 => Self::Uint64(EmptyMessage {}),
132            DataType::Float16 => Self::Float16(EmptyMessage {}),
133            DataType::Float32 => Self::Float32(EmptyMessage {}),
134            DataType::Float64 => Self::Float64(EmptyMessage {}),
135            DataType::Timestamp(time_unit, timezone) => {
136                Self::Timestamp(protobuf::Timestamp {
137                    time_unit: protobuf::TimeUnit::from(time_unit) as i32,
138                    timezone: timezone.as_deref().unwrap_or("").to_string(),
139                })
140            }
141            DataType::Date32 => Self::Date32(EmptyMessage {}),
142            DataType::Date64 => Self::Date64(EmptyMessage {}),
143            DataType::Time32(time_unit) => {
144                Self::Time32(protobuf::TimeUnit::from(time_unit) as i32)
145            }
146            DataType::Time64(time_unit) => {
147                Self::Time64(protobuf::TimeUnit::from(time_unit) as i32)
148            }
149            DataType::Duration(time_unit) => {
150                Self::Duration(protobuf::TimeUnit::from(time_unit) as i32)
151            }
152            DataType::Interval(interval_unit) => {
153                Self::Interval(protobuf::IntervalUnit::from(interval_unit) as i32)
154            }
155            DataType::Binary => Self::Binary(EmptyMessage {}),
156            DataType::BinaryView => Self::BinaryView(EmptyMessage {}),
157            DataType::FixedSizeBinary(size) => Self::FixedSizeBinary(*size),
158            DataType::LargeBinary => Self::LargeBinary(EmptyMessage {}),
159            DataType::Utf8 => Self::Utf8(EmptyMessage {}),
160            DataType::Utf8View => Self::Utf8View(EmptyMessage {}),
161            DataType::LargeUtf8 => Self::LargeUtf8(EmptyMessage {}),
162            DataType::List(item_type) => Self::List(Box::new(protobuf::List {
163                field_type: Some(Box::new(item_type.as_ref().try_into()?)),
164            })),
165            DataType::FixedSizeList(item_type, size) => {
166                Self::FixedSizeList(Box::new(protobuf::FixedSizeList {
167                    field_type: Some(Box::new(item_type.as_ref().try_into()?)),
168                    list_size: *size,
169                }))
170            }
171            DataType::LargeList(item_type) => Self::LargeList(Box::new(protobuf::List {
172                field_type: Some(Box::new(item_type.as_ref().try_into()?)),
173            })),
174            DataType::Struct(struct_fields) => Self::Struct(protobuf::Struct {
175                sub_field_types: convert_arc_fields_to_proto_fields(struct_fields)?,
176            }),
177            DataType::Union(fields, union_mode) => {
178                let union_mode = match union_mode {
179                    UnionMode::Sparse => protobuf::UnionMode::Sparse,
180                    UnionMode::Dense => protobuf::UnionMode::Dense,
181                };
182                Self::Union(protobuf::Union {
183                    union_types: convert_arc_fields_to_proto_fields(
184                        fields.iter().map(|(_, item)| item),
185                    )?,
186                    union_mode: union_mode.into(),
187                    type_ids: fields.iter().map(|(x, _)| x as i32).collect(),
188                })
189            }
190            DataType::Dictionary(key_type, value_type) => {
191                Self::Dictionary(Box::new(protobuf::Dictionary {
192                    key: Some(Box::new(key_type.as_ref().try_into()?)),
193                    value: Some(Box::new(value_type.as_ref().try_into()?)),
194                }))
195            }
196            DataType::Decimal32(precision, scale) => {
197                Self::Decimal32(protobuf::Decimal32Type {
198                    precision: *precision as u32,
199                    scale: *scale as i32,
200                })
201            }
202            DataType::Decimal64(precision, scale) => {
203                Self::Decimal64(protobuf::Decimal64Type {
204                    precision: *precision as u32,
205                    scale: *scale as i32,
206                })
207            }
208            DataType::Decimal128(precision, scale) => {
209                Self::Decimal128(protobuf::Decimal128Type {
210                    precision: *precision as u32,
211                    scale: *scale as i32,
212                })
213            }
214            DataType::Decimal256(precision, scale) => {
215                Self::Decimal256(protobuf::Decimal256Type {
216                    precision: *precision as u32,
217                    scale: *scale as i32,
218                })
219            }
220            DataType::Map(field, sorted) => Self::Map(Box::new(protobuf::Map {
221                field_type: Some(Box::new(field.as_ref().try_into()?)),
222                keys_sorted: *sorted,
223            })),
224            DataType::RunEndEncoded(run_ends_field, values_field) => {
225                Self::RunEndEncoded(Box::new(protobuf::RunEndEncoded {
226                    run_ends_field: Some(Box::new(run_ends_field.as_ref().try_into()?)),
227                    values_field: Some(Box::new(values_field.as_ref().try_into()?)),
228                }))
229            }
230            DataType::ListView(_) | DataType::LargeListView(_) => {
231                return Err(Error::General(format!(
232                    "Proto serialization error: {val} not yet supported"
233                )));
234            }
235        };
236
237        Ok(res)
238    }
239}
240
241impl From<Column> for protobuf::Column {
242    fn from(c: Column) -> Self {
243        Self {
244            relation: c.relation.map(|relation| protobuf::ColumnRelation {
245                relation: relation.to_string(),
246            }),
247            name: c.name,
248        }
249    }
250}
251
252impl From<&Column> for protobuf::Column {
253    fn from(c: &Column) -> Self {
254        c.clone().into()
255    }
256}
257
258impl TryFrom<&Schema> for protobuf::Schema {
259    type Error = Error;
260
261    fn try_from(schema: &Schema) -> Result<Self, Self::Error> {
262        Ok(Self {
263            columns: convert_arc_fields_to_proto_fields(schema.fields())?,
264            metadata: schema.metadata.clone(),
265        })
266    }
267}
268
269impl TryFrom<SchemaRef> for protobuf::Schema {
270    type Error = Error;
271
272    fn try_from(schema: SchemaRef) -> Result<Self, Self::Error> {
273        Ok(Self {
274            columns: convert_arc_fields_to_proto_fields(schema.fields())?,
275            metadata: schema.metadata.clone(),
276        })
277    }
278}
279
280impl TryFrom<&DFSchema> for protobuf::DfSchema {
281    type Error = Error;
282
283    fn try_from(s: &DFSchema) -> Result<Self, Self::Error> {
284        let columns = s
285            .iter()
286            .map(|(qualifier, field)| {
287                Ok(protobuf::DfField {
288                    field: Some(field.as_ref().try_into()?),
289                    qualifier: qualifier.map(|r| protobuf::ColumnRelation {
290                        relation: r.to_string(),
291                    }),
292                })
293            })
294            .collect::<Result<Vec<_>, Error>>()?;
295        Ok(Self {
296            columns,
297            metadata: s.metadata().clone(),
298        })
299    }
300}
301
302impl TryFrom<&DFSchemaRef> for protobuf::DfSchema {
303    type Error = Error;
304
305    fn try_from(s: &DFSchemaRef) -> Result<Self, Self::Error> {
306        s.as_ref().try_into()
307    }
308}
309
310impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
311    type Error = Error;
312
313    fn try_from(val: &ScalarValue) -> Result<Self, Self::Error> {
314        let data_type = val.data_type();
315        match val {
316            ScalarValue::Boolean(val) => {
317                create_proto_scalar(val.as_ref(), &data_type, |s| Value::BoolValue(*s))
318            }
319            ScalarValue::Float16(val) => {
320                create_proto_scalar(val.as_ref(), &data_type, |s| {
321                    Value::Float32Value((*s).into())
322                })
323            }
324            ScalarValue::Float32(val) => {
325                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Float32Value(*s))
326            }
327            ScalarValue::Float64(val) => {
328                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Float64Value(*s))
329            }
330            ScalarValue::Int8(val) => {
331                create_proto_scalar(val.as_ref(), &data_type, |s| {
332                    Value::Int8Value(*s as i32)
333                })
334            }
335            ScalarValue::Int16(val) => {
336                create_proto_scalar(val.as_ref(), &data_type, |s| {
337                    Value::Int16Value(*s as i32)
338                })
339            }
340            ScalarValue::Int32(val) => {
341                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Int32Value(*s))
342            }
343            ScalarValue::Int64(val) => {
344                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Int64Value(*s))
345            }
346            ScalarValue::UInt8(val) => {
347                create_proto_scalar(val.as_ref(), &data_type, |s| {
348                    Value::Uint8Value(*s as u32)
349                })
350            }
351            ScalarValue::UInt16(val) => {
352                create_proto_scalar(val.as_ref(), &data_type, |s| {
353                    Value::Uint16Value(*s as u32)
354                })
355            }
356            ScalarValue::UInt32(val) => {
357                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Uint32Value(*s))
358            }
359            ScalarValue::UInt64(val) => {
360                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Uint64Value(*s))
361            }
362            ScalarValue::Utf8(val) => {
363                create_proto_scalar(val.as_ref(), &data_type, |s| {
364                    Value::Utf8Value(s.to_owned())
365                })
366            }
367            ScalarValue::LargeUtf8(val) => {
368                create_proto_scalar(val.as_ref(), &data_type, |s| {
369                    Value::LargeUtf8Value(s.to_owned())
370                })
371            }
372            ScalarValue::Utf8View(val) => {
373                create_proto_scalar(val.as_ref(), &data_type, |s| {
374                    Value::Utf8ViewValue(s.to_owned())
375                })
376            }
377            ScalarValue::List(arr) => {
378                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
379            }
380            ScalarValue::LargeList(arr) => {
381                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
382            }
383            ScalarValue::FixedSizeList(arr) => {
384                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
385            }
386            ScalarValue::Struct(arr) => {
387                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
388            }
389            ScalarValue::Map(arr) => {
390                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
391            }
392            ScalarValue::Date32(val) => {
393                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Date32Value(*s))
394            }
395            ScalarValue::TimestampMicrosecond(val, tz) => {
396                create_proto_scalar(val.as_ref(), &data_type, |s| {
397                    Value::TimestampValue(protobuf::ScalarTimestampValue {
398                        timezone: tz.as_deref().unwrap_or("").to_string(),
399                        value: Some(
400                            protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(
401                                *s,
402                            ),
403                        ),
404                    })
405                })
406            }
407            ScalarValue::TimestampNanosecond(val, tz) => {
408                create_proto_scalar(val.as_ref(), &data_type, |s| {
409                    Value::TimestampValue(protobuf::ScalarTimestampValue {
410                        timezone: tz.as_deref().unwrap_or("").to_string(),
411                        value: Some(
412                            protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(
413                                *s,
414                            ),
415                        ),
416                    })
417                })
418            }
419            ScalarValue::Decimal32(val, p, s) => match *val {
420                Some(v) => {
421                    let array = v.to_be_bytes();
422                    let vec_val: Vec<u8> = array.to_vec();
423                    Ok(protobuf::ScalarValue {
424                        value: Some(Value::Decimal32Value(protobuf::Decimal32 {
425                            value: vec_val,
426                            p: *p as i64,
427                            s: *s as i64,
428                        })),
429                    })
430                }
431                None => Ok(protobuf::ScalarValue {
432                    value: Some(protobuf::scalar_value::Value::NullValue(
433                        (&data_type).try_into()?,
434                    )),
435                }),
436            },
437            ScalarValue::Decimal64(val, p, s) => match *val {
438                Some(v) => {
439                    let array = v.to_be_bytes();
440                    let vec_val: Vec<u8> = array.to_vec();
441                    Ok(protobuf::ScalarValue {
442                        value: Some(Value::Decimal64Value(protobuf::Decimal64 {
443                            value: vec_val,
444                            p: *p as i64,
445                            s: *s as i64,
446                        })),
447                    })
448                }
449                None => Ok(protobuf::ScalarValue {
450                    value: Some(protobuf::scalar_value::Value::NullValue(
451                        (&data_type).try_into()?,
452                    )),
453                }),
454            },
455            ScalarValue::Decimal128(val, p, s) => match *val {
456                Some(v) => {
457                    let array = v.to_be_bytes();
458                    let vec_val: Vec<u8> = array.to_vec();
459                    Ok(protobuf::ScalarValue {
460                        value: Some(Value::Decimal128Value(protobuf::Decimal128 {
461                            value: vec_val,
462                            p: *p as i64,
463                            s: *s as i64,
464                        })),
465                    })
466                }
467                None => Ok(protobuf::ScalarValue {
468                    value: Some(protobuf::scalar_value::Value::NullValue(
469                        (&data_type).try_into()?,
470                    )),
471                }),
472            },
473            ScalarValue::Decimal256(val, p, s) => match *val {
474                Some(v) => {
475                    let array = v.to_be_bytes();
476                    let vec_val: Vec<u8> = array.to_vec();
477                    Ok(protobuf::ScalarValue {
478                        value: Some(Value::Decimal256Value(protobuf::Decimal256 {
479                            value: vec_val,
480                            p: *p as i64,
481                            s: *s as i64,
482                        })),
483                    })
484                }
485                None => Ok(protobuf::ScalarValue {
486                    value: Some(protobuf::scalar_value::Value::NullValue(
487                        (&data_type).try_into()?,
488                    )),
489                }),
490            },
491            ScalarValue::Date64(val) => {
492                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Date64Value(*s))
493            }
494            ScalarValue::TimestampSecond(val, tz) => {
495                create_proto_scalar(val.as_ref(), &data_type, |s| {
496                    Value::TimestampValue(protobuf::ScalarTimestampValue {
497                        timezone: tz.as_deref().unwrap_or("").to_string(),
498                        value: Some(
499                            protobuf::scalar_timestamp_value::Value::TimeSecondValue(*s),
500                        ),
501                    })
502                })
503            }
504            ScalarValue::TimestampMillisecond(val, tz) => {
505                create_proto_scalar(val.as_ref(), &data_type, |s| {
506                    Value::TimestampValue(protobuf::ScalarTimestampValue {
507                        timezone: tz.as_deref().unwrap_or("").to_string(),
508                        value: Some(
509                            protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(
510                                *s,
511                            ),
512                        ),
513                    })
514                })
515            }
516            ScalarValue::IntervalYearMonth(val) => {
517                create_proto_scalar(val.as_ref(), &data_type, |s| {
518                    Value::IntervalYearmonthValue(*s)
519                })
520            }
521            ScalarValue::Null => Ok(protobuf::ScalarValue {
522                value: Some(Value::NullValue((&data_type).try_into()?)),
523            }),
524
525            ScalarValue::Binary(val) => {
526                create_proto_scalar(val.as_ref(), &data_type, |s| {
527                    Value::BinaryValue(s.to_owned())
528                })
529            }
530            ScalarValue::BinaryView(val) => {
531                create_proto_scalar(val.as_ref(), &data_type, |s| {
532                    Value::BinaryViewValue(s.to_owned())
533                })
534            }
535            ScalarValue::LargeBinary(val) => {
536                create_proto_scalar(val.as_ref(), &data_type, |s| {
537                    Value::LargeBinaryValue(s.to_owned())
538                })
539            }
540            ScalarValue::FixedSizeBinary(length, val) => {
541                create_proto_scalar(val.as_ref(), &data_type, |s| {
542                    Value::FixedSizeBinaryValue(protobuf::ScalarFixedSizeBinary {
543                        values: s.to_owned(),
544                        length: *length,
545                    })
546                })
547            }
548
549            ScalarValue::Time32Second(v) => {
550                create_proto_scalar(v.as_ref(), &data_type, |v| {
551                    Value::Time32Value(protobuf::ScalarTime32Value {
552                        value: Some(
553                            protobuf::scalar_time32_value::Value::Time32SecondValue(*v),
554                        ),
555                    })
556                })
557            }
558
559            ScalarValue::Time32Millisecond(v) => {
560                create_proto_scalar(v.as_ref(), &data_type, |v| {
561                    Value::Time32Value(protobuf::ScalarTime32Value {
562                        value: Some(
563                            protobuf::scalar_time32_value::Value::Time32MillisecondValue(
564                                *v,
565                            ),
566                        ),
567                    })
568                })
569            }
570
571            ScalarValue::Time64Microsecond(v) => {
572                create_proto_scalar(v.as_ref(), &data_type, |v| {
573                    Value::Time64Value(protobuf::ScalarTime64Value {
574                        value: Some(
575                            protobuf::scalar_time64_value::Value::Time64MicrosecondValue(
576                                *v,
577                            ),
578                        ),
579                    })
580                })
581            }
582
583            ScalarValue::Time64Nanosecond(v) => {
584                create_proto_scalar(v.as_ref(), &data_type, |v| {
585                    Value::Time64Value(protobuf::ScalarTime64Value {
586                        value: Some(
587                            protobuf::scalar_time64_value::Value::Time64NanosecondValue(
588                                *v,
589                            ),
590                        ),
591                    })
592                })
593            }
594
595            ScalarValue::IntervalDayTime(val) => {
596                let value = if let Some(v) = val {
597                    let (days, milliseconds) = IntervalDayTimeType::to_parts(*v);
598                    Value::IntervalDaytimeValue(protobuf::IntervalDayTimeValue {
599                        days,
600                        milliseconds,
601                    })
602                } else {
603                    Value::NullValue((&data_type).try_into()?)
604                };
605
606                Ok(protobuf::ScalarValue { value: Some(value) })
607            }
608
609            ScalarValue::IntervalMonthDayNano(v) => {
610                let value = if let Some(v) = v {
611                    let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v);
612                    Value::IntervalMonthDayNano(protobuf::IntervalMonthDayNanoValue {
613                        months,
614                        days,
615                        nanos,
616                    })
617                } else {
618                    Value::NullValue((&data_type).try_into()?)
619                };
620
621                Ok(protobuf::ScalarValue { value: Some(value) })
622            }
623
624            ScalarValue::DurationSecond(v) => {
625                let value = match v {
626                    Some(v) => Value::DurationSecondValue(*v),
627                    None => Value::NullValue((&data_type).try_into()?),
628                };
629                Ok(protobuf::ScalarValue { value: Some(value) })
630            }
631            ScalarValue::DurationMillisecond(v) => {
632                let value = match v {
633                    Some(v) => Value::DurationMillisecondValue(*v),
634                    None => Value::NullValue((&data_type).try_into()?),
635                };
636                Ok(protobuf::ScalarValue { value: Some(value) })
637            }
638            ScalarValue::DurationMicrosecond(v) => {
639                let value = match v {
640                    Some(v) => Value::DurationMicrosecondValue(*v),
641                    None => Value::NullValue((&data_type).try_into()?),
642                };
643                Ok(protobuf::ScalarValue { value: Some(value) })
644            }
645            ScalarValue::DurationNanosecond(v) => {
646                let value = match v {
647                    Some(v) => Value::DurationNanosecondValue(*v),
648                    None => Value::NullValue((&data_type).try_into()?),
649                };
650                Ok(protobuf::ScalarValue { value: Some(value) })
651            }
652
653            ScalarValue::Union(val, df_fields, mode) => {
654                let mut fields =
655                    Vec::<protobuf::UnionField>::with_capacity(df_fields.len());
656                for (id, field) in df_fields.iter() {
657                    let field_id = id as i32;
658                    let field = Some(field.as_ref().try_into()?);
659                    let field = protobuf::UnionField { field_id, field };
660                    fields.push(field);
661                }
662                let mode = match mode {
663                    UnionMode::Sparse => 0,
664                    UnionMode::Dense => 1,
665                };
666                let value = match val {
667                    None => None,
668                    Some((_id, v)) => Some(Box::new(v.as_ref().try_into()?)),
669                };
670                let val = protobuf::UnionValue {
671                    value_id: val.as_ref().map(|(id, _v)| *id as i32).unwrap_or(0),
672                    value,
673                    fields,
674                    mode,
675                };
676                let val = Value::UnionValue(Box::new(val));
677                let val = protobuf::ScalarValue { value: Some(val) };
678                Ok(val)
679            }
680
681            ScalarValue::Dictionary(index_type, val) => {
682                let value: protobuf::ScalarValue = val.as_ref().try_into()?;
683                Ok(protobuf::ScalarValue {
684                    value: Some(Value::DictionaryValue(Box::new(
685                        protobuf::ScalarDictionaryValue {
686                            index_type: Some(index_type.as_ref().try_into()?),
687                            value: Some(Box::new(value)),
688                        },
689                    ))),
690                })
691            }
692
693            ScalarValue::RunEndEncoded(run_ends_field, values_field, val) => {
694                Ok(protobuf::ScalarValue {
695                    value: Some(Value::RunEndEncodedValue(Box::new(
696                        protobuf::ScalarRunEndEncodedValue {
697                            run_ends_field: Some(run_ends_field.as_ref().try_into()?),
698                            values_field: Some(values_field.as_ref().try_into()?),
699                            value: Some(Box::new(val.as_ref().try_into()?)),
700                        },
701                    ))),
702                })
703            }
704        }
705    }
706}
707
708impl From<&TimeUnit> for protobuf::TimeUnit {
709    fn from(val: &TimeUnit) -> Self {
710        match val {
711            TimeUnit::Second => protobuf::TimeUnit::Second,
712            TimeUnit::Millisecond => protobuf::TimeUnit::Millisecond,
713            TimeUnit::Microsecond => protobuf::TimeUnit::Microsecond,
714            TimeUnit::Nanosecond => protobuf::TimeUnit::Nanosecond,
715        }
716    }
717}
718
719impl From<&IntervalUnit> for protobuf::IntervalUnit {
720    fn from(interval_unit: &IntervalUnit) -> Self {
721        match interval_unit {
722            IntervalUnit::YearMonth => protobuf::IntervalUnit::YearMonth,
723            IntervalUnit::DayTime => protobuf::IntervalUnit::DayTime,
724            IntervalUnit::MonthDayNano => protobuf::IntervalUnit::MonthDayNano,
725        }
726    }
727}
728
729impl From<Constraints> for protobuf::Constraints {
730    fn from(value: Constraints) -> Self {
731        let constraints = value.into_iter().map(|item| item.into()).collect();
732        protobuf::Constraints { constraints }
733    }
734}
735
736impl From<Constraint> for protobuf::Constraint {
737    fn from(value: Constraint) -> Self {
738        let res = match value {
739            Constraint::PrimaryKey(indices) => {
740                let indices = indices.into_iter().map(|item| item as u64).collect();
741                protobuf::constraint::ConstraintMode::PrimaryKey(
742                    protobuf::PrimaryKeyConstraint { indices },
743                )
744            }
745            Constraint::Unique(indices) => {
746                let indices = indices.into_iter().map(|item| item as u64).collect();
747                protobuf::constraint::ConstraintMode::PrimaryKey(
748                    protobuf::PrimaryKeyConstraint { indices },
749                )
750            }
751        };
752        protobuf::Constraint {
753            constraint_mode: Some(res),
754        }
755    }
756}
757
758impl From<&Precision<usize>> for protobuf::Precision {
759    fn from(s: &Precision<usize>) -> protobuf::Precision {
760        match s {
761            Precision::Exact(val) => protobuf::Precision {
762                precision_info: protobuf::PrecisionInfo::Exact.into(),
763                val: Some(crate::protobuf_common::ScalarValue {
764                    value: Some(Value::Uint64Value(*val as u64)),
765                }),
766            },
767            Precision::Inexact(val) => protobuf::Precision {
768                precision_info: protobuf::PrecisionInfo::Inexact.into(),
769                val: Some(crate::protobuf_common::ScalarValue {
770                    value: Some(Value::Uint64Value(*val as u64)),
771                }),
772            },
773            Precision::Absent => protobuf::Precision {
774                precision_info: protobuf::PrecisionInfo::Absent.into(),
775                val: Some(crate::protobuf_common::ScalarValue { value: None }),
776            },
777        }
778    }
779}
780
781impl From<&Precision<datafusion_common::ScalarValue>> for protobuf::Precision {
782    fn from(s: &Precision<datafusion_common::ScalarValue>) -> protobuf::Precision {
783        match s {
784            Precision::Exact(val) => protobuf::Precision {
785                precision_info: protobuf::PrecisionInfo::Exact.into(),
786                val: val.try_into().ok(),
787            },
788            Precision::Inexact(val) => protobuf::Precision {
789                precision_info: protobuf::PrecisionInfo::Inexact.into(),
790                val: val.try_into().ok(),
791            },
792            Precision::Absent => protobuf::Precision {
793                precision_info: protobuf::PrecisionInfo::Absent.into(),
794                val: Some(crate::protobuf_common::ScalarValue { value: None }),
795            },
796        }
797    }
798}
799
800impl From<&Statistics> for protobuf::Statistics {
801    fn from(s: &Statistics) -> protobuf::Statistics {
802        let column_stats = s.column_statistics.iter().map(|s| s.into()).collect();
803        protobuf::Statistics {
804            num_rows: Some(protobuf::Precision::from(&s.num_rows)),
805            total_byte_size: Some(protobuf::Precision::from(&s.total_byte_size)),
806            column_stats,
807        }
808    }
809}
810
811impl From<&ColumnStatistics> for protobuf::ColumnStats {
812    fn from(s: &ColumnStatistics) -> protobuf::ColumnStats {
813        protobuf::ColumnStats {
814            min_value: Some(protobuf::Precision::from(&s.min_value)),
815            max_value: Some(protobuf::Precision::from(&s.max_value)),
816            sum_value: Some(protobuf::Precision::from(&s.sum_value)),
817            null_count: Some(protobuf::Precision::from(&s.null_count)),
818            distinct_count: Some(protobuf::Precision::from(&s.distinct_count)),
819            byte_size: Some(protobuf::Precision::from(&s.byte_size)),
820        }
821    }
822}
823
824impl From<JoinSide> for protobuf::JoinSide {
825    fn from(t: JoinSide) -> Self {
826        match t {
827            JoinSide::Left => protobuf::JoinSide::LeftSide,
828            JoinSide::Right => protobuf::JoinSide::RightSide,
829            JoinSide::None => protobuf::JoinSide::None,
830        }
831    }
832}
833
834impl From<&CompressionTypeVariant> for protobuf::CompressionTypeVariant {
835    fn from(value: &CompressionTypeVariant) -> Self {
836        match value {
837            CompressionTypeVariant::GZIP => Self::Gzip,
838            CompressionTypeVariant::BZIP2 => Self::Bzip2,
839            CompressionTypeVariant::XZ => Self::Xz,
840            CompressionTypeVariant::ZSTD => Self::Zstd,
841            CompressionTypeVariant::UNCOMPRESSED => Self::Uncompressed,
842        }
843    }
844}
845
846impl TryFrom<&CsvWriterOptions> for protobuf::CsvWriterOptions {
847    type Error = DataFusionError;
848
849    fn try_from(opts: &CsvWriterOptions) -> datafusion_common::Result<Self, Self::Error> {
850        Ok(csv_writer_options_to_proto(
851            &opts.writer_options,
852            &opts.compression,
853        ))
854    }
855}
856
857impl TryFrom<&JsonWriterOptions> for protobuf::JsonWriterOptions {
858    type Error = DataFusionError;
859
860    fn try_from(
861        opts: &JsonWriterOptions,
862    ) -> datafusion_common::Result<Self, Self::Error> {
863        let compression: protobuf::CompressionTypeVariant = opts.compression.into();
864        Ok(protobuf::JsonWriterOptions {
865            compression: compression.into(),
866        })
867    }
868}
869
870impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
871    type Error = DataFusionError;
872
873    fn try_from(value: &ParquetOptions) -> datafusion_common::Result<Self, Self::Error> {
874        Ok(protobuf::ParquetOptions {
875            enable_page_index: value.enable_page_index,
876            pruning: value.pruning,
877            skip_metadata: value.skip_metadata,
878            metadata_size_hint_opt: value.metadata_size_hint.map(|v| protobuf::parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v as u64)),
879            pushdown_filters: value.pushdown_filters,
880            reorder_filters: value.reorder_filters,
881            force_filter_selections: value.force_filter_selections,
882            data_pagesize_limit: value.data_pagesize_limit as u64,
883            write_batch_size: value.write_batch_size as u64,
884            writer_version: value.writer_version.to_string(),
885            compression_opt: value.compression.clone().map(protobuf::parquet_options::CompressionOpt::Compression),
886            dictionary_enabled_opt: value.dictionary_enabled.map(protobuf::parquet_options::DictionaryEnabledOpt::DictionaryEnabled),
887            dictionary_page_size_limit: value.dictionary_page_size_limit as u64,
888            statistics_enabled_opt: value.statistics_enabled.clone().map(protobuf::parquet_options::StatisticsEnabledOpt::StatisticsEnabled),
889            max_row_group_size: value.max_row_group_size as u64,
890            created_by: value.created_by.clone(),
891            column_index_truncate_length_opt: value.column_index_truncate_length.map(|v| protobuf::parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v as u64)),
892            statistics_truncate_length_opt: value.statistics_truncate_length.map(|v| protobuf::parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v as u64)),
893            data_page_row_count_limit: value.data_page_row_count_limit as u64,
894            encoding_opt: value.encoding.clone().map(protobuf::parquet_options::EncodingOpt::Encoding),
895            bloom_filter_on_read: value.bloom_filter_on_read,
896            bloom_filter_on_write: value.bloom_filter_on_write,
897            bloom_filter_fpp_opt: value.bloom_filter_fpp.map(protobuf::parquet_options::BloomFilterFppOpt::BloomFilterFpp),
898            bloom_filter_ndv_opt: value.bloom_filter_ndv.map(protobuf::parquet_options::BloomFilterNdvOpt::BloomFilterNdv),
899            allow_single_file_parallelism: value.allow_single_file_parallelism,
900            maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as u64,
901            maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as u64,
902            schema_force_view_types: value.schema_force_view_types,
903            binary_as_string: value.binary_as_string,
904            skip_arrow_metadata: value.skip_arrow_metadata,
905            coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96),
906            max_predicate_cache_size_opt: value.max_predicate_cache_size.map(|v| protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as u64)),
907        })
908    }
909}
910
911impl TryFrom<&ParquetColumnOptions> for protobuf::ParquetColumnOptions {
912    type Error = DataFusionError;
913
914    fn try_from(
915        value: &ParquetColumnOptions,
916    ) -> datafusion_common::Result<Self, Self::Error> {
917        Ok(protobuf::ParquetColumnOptions {
918            compression_opt: value
919                .compression
920                .clone()
921                .map(protobuf::parquet_column_options::CompressionOpt::Compression),
922            dictionary_enabled_opt: value
923                .dictionary_enabled
924                .map(protobuf::parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled),
925            statistics_enabled_opt: value
926                .statistics_enabled
927                .clone()
928                .map(protobuf::parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled),
929            encoding_opt: value
930                .encoding
931                .clone()
932                .map(protobuf::parquet_column_options::EncodingOpt::Encoding),
933            bloom_filter_enabled_opt: value
934                .bloom_filter_enabled
935                .map(protobuf::parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled),
936            bloom_filter_fpp_opt: value
937                .bloom_filter_fpp
938                .map(protobuf::parquet_column_options::BloomFilterFppOpt::BloomFilterFpp),
939            bloom_filter_ndv_opt: value
940                .bloom_filter_ndv
941                .map(protobuf::parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv),
942        })
943    }
944}
945
946impl TryFrom<&TableParquetOptions> for protobuf::TableParquetOptions {
947    type Error = DataFusionError;
948    fn try_from(
949        value: &TableParquetOptions,
950    ) -> datafusion_common::Result<Self, Self::Error> {
951        let column_specific_options = value
952            .column_specific_options
953            .iter()
954            .map(|(k, v)| {
955                Ok(protobuf::ParquetColumnSpecificOptions {
956                    column_name: k.into(),
957                    options: Some(v.try_into()?),
958                })
959            })
960            .collect::<datafusion_common::Result<Vec<_>>>()?;
961        let key_value_metadata = value
962            .key_value_metadata
963            .iter()
964            .filter_map(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone())))
965            .collect::<HashMap<String, String>>();
966        Ok(protobuf::TableParquetOptions {
967            global: Some((&value.global).try_into()?),
968            column_specific_options,
969            key_value_metadata,
970        })
971    }
972}
973
974impl TryFrom<&CsvOptions> for protobuf::CsvOptions {
975    type Error = DataFusionError; // Define or use an appropriate error type
976
977    fn try_from(opts: &CsvOptions) -> datafusion_common::Result<Self, Self::Error> {
978        let compression: protobuf::CompressionTypeVariant = opts.compression.into();
979        Ok(protobuf::CsvOptions {
980            has_header: opts.has_header.map_or_else(Vec::new, |h| vec![h as u8]),
981            delimiter: vec![opts.delimiter],
982            quote: vec![opts.quote],
983            terminator: opts.terminator.map_or_else(Vec::new, |e| vec![e]),
984            escape: opts.escape.map_or_else(Vec::new, |e| vec![e]),
985            double_quote: opts.double_quote.map_or_else(Vec::new, |h| vec![h as u8]),
986            newlines_in_values: opts
987                .newlines_in_values
988                .map_or_else(Vec::new, |h| vec![h as u8]),
989            compression: compression.into(),
990            schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
991            date_format: opts.date_format.clone().unwrap_or_default(),
992            datetime_format: opts.datetime_format.clone().unwrap_or_default(),
993            timestamp_format: opts.timestamp_format.clone().unwrap_or_default(),
994            timestamp_tz_format: opts.timestamp_tz_format.clone().unwrap_or_default(),
995            time_format: opts.time_format.clone().unwrap_or_default(),
996            null_value: opts.null_value.clone().unwrap_or_default(),
997            null_regex: opts.null_regex.clone().unwrap_or_default(),
998            comment: opts.comment.map_or_else(Vec::new, |h| vec![h]),
999            truncated_rows: opts.truncated_rows.map_or_else(Vec::new, |h| vec![h as u8]),
1000            compression_level: opts.compression_level,
1001        })
1002    }
1003}
1004
1005impl TryFrom<&JsonOptions> for protobuf::JsonOptions {
1006    type Error = DataFusionError;
1007
1008    fn try_from(opts: &JsonOptions) -> datafusion_common::Result<Self, Self::Error> {
1009        let compression: protobuf::CompressionTypeVariant = opts.compression.into();
1010        Ok(protobuf::JsonOptions {
1011            compression: compression.into(),
1012            schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
1013            compression_level: opts.compression_level,
1014            newline_delimited: Some(opts.newline_delimited),
1015        })
1016    }
1017}
1018
1019/// Creates a scalar protobuf value from an optional value (T), and
1020/// encoding None as the appropriate datatype
1021fn create_proto_scalar<I, T: FnOnce(&I) -> protobuf::scalar_value::Value>(
1022    v: Option<&I>,
1023    null_arrow_type: &DataType,
1024    constructor: T,
1025) -> Result<protobuf::ScalarValue, Error> {
1026    let value = v
1027        .map(constructor)
1028        .unwrap_or(protobuf::scalar_value::Value::NullValue(
1029            null_arrow_type.try_into()?,
1030        ));
1031
1032    Ok(protobuf::ScalarValue { value: Some(value) })
1033}
1034
1035// Nested ScalarValue types (List / FixedSizeList / LargeList / Struct / Map) are serialized using
1036// Arrow IPC messages as a single column RecordBatch
1037fn encode_scalar_nested_value(
1038    arr: ArrayRef,
1039    val: &ScalarValue,
1040) -> Result<protobuf::ScalarValue, Error> {
1041    let batch = RecordBatch::try_from_iter(vec![("field_name", arr)]).map_err(|e| {
1042        Error::General(format!(
1043            "Error creating temporary batch while encoding nested ScalarValue: {e}"
1044        ))
1045    })?;
1046
1047    let ipc_gen = IpcDataGenerator {};
1048    let mut dict_tracker = DictionaryTracker::new(false);
1049    let write_options = IpcWriteOptions::default();
1050    // The IPC writer requires pre-allocated dictionary IDs (normally assigned when
1051    // serializing the schema). Populate `dict_tracker` by encoding the schema first.
1052    ipc_gen.schema_to_bytes_with_dictionary_tracker(
1053        batch.schema().as_ref(),
1054        &mut dict_tracker,
1055        &write_options,
1056    );
1057    let mut compression_context = CompressionContext::default();
1058    let (encoded_dictionaries, encoded_message) = ipc_gen
1059        .encode(
1060            &batch,
1061            &mut dict_tracker,
1062            &write_options,
1063            &mut compression_context,
1064        )
1065        .map_err(|e| {
1066            Error::General(format!("Error encoding nested ScalarValue as IPC: {e}"))
1067        })?;
1068
1069    let schema: protobuf::Schema = batch.schema().try_into()?;
1070
1071    let scalar_list_value = protobuf::ScalarNestedValue {
1072        ipc_message: encoded_message.ipc_message,
1073        arrow_data: encoded_message.arrow_data,
1074        dictionaries: encoded_dictionaries
1075            .into_iter()
1076            .map(|data| protobuf::scalar_nested_value::Dictionary {
1077                ipc_message: data.ipc_message,
1078                arrow_data: data.arrow_data,
1079            })
1080            .collect(),
1081        schema: Some(schema),
1082    };
1083
1084    match val {
1085        ScalarValue::List(_) => Ok(protobuf::ScalarValue {
1086            value: Some(protobuf::scalar_value::Value::ListValue(scalar_list_value)),
1087        }),
1088        ScalarValue::LargeList(_) => Ok(protobuf::ScalarValue {
1089            value: Some(protobuf::scalar_value::Value::LargeListValue(
1090                scalar_list_value,
1091            )),
1092        }),
1093        ScalarValue::FixedSizeList(_) => Ok(protobuf::ScalarValue {
1094            value: Some(protobuf::scalar_value::Value::FixedSizeListValue(
1095                scalar_list_value,
1096            )),
1097        }),
1098        ScalarValue::Struct(_) => Ok(protobuf::ScalarValue {
1099            value: Some(protobuf::scalar_value::Value::StructValue(
1100                scalar_list_value,
1101            )),
1102        }),
1103        ScalarValue::Map(_) => Ok(protobuf::ScalarValue {
1104            value: Some(protobuf::scalar_value::Value::MapValue(scalar_list_value)),
1105        }),
1106        _ => unreachable!(),
1107    }
1108}
1109
1110/// Converts a vector of `Arc<arrow::Field>`s to `protobuf::Field`s
1111fn convert_arc_fields_to_proto_fields<'a, I>(
1112    fields: I,
1113) -> Result<Vec<protobuf::Field>, Error>
1114where
1115    I: IntoIterator<Item = &'a Arc<Field>>,
1116{
1117    fields
1118        .into_iter()
1119        .map(|field| field.as_ref().try_into())
1120        .collect::<Result<Vec<_>, Error>>()
1121}
1122
1123pub(crate) fn csv_writer_options_to_proto(
1124    csv_options: &WriterBuilder,
1125    compression: &CompressionTypeVariant,
1126) -> protobuf::CsvWriterOptions {
1127    let compression: protobuf::CompressionTypeVariant = compression.into();
1128    protobuf::CsvWriterOptions {
1129        compression: compression.into(),
1130        delimiter: (csv_options.delimiter() as char).to_string(),
1131        has_header: csv_options.header(),
1132        date_format: csv_options.date_format().unwrap_or("").to_owned(),
1133        datetime_format: csv_options.datetime_format().unwrap_or("").to_owned(),
1134        timestamp_format: csv_options.timestamp_format().unwrap_or("").to_owned(),
1135        time_format: csv_options.time_format().unwrap_or("").to_owned(),
1136        null_value: csv_options.null().to_owned(),
1137        quote: (csv_options.quote() as char).to_string(),
1138        escape: (csv_options.escape() as char).to_string(),
1139        double_quote: csv_options.double_quote(),
1140    }
1141}