datafusion_proto_common/to_proto/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::protobuf_common as protobuf;
22use crate::protobuf_common::{
23    arrow_type::ArrowTypeEnum, scalar_value::Value, EmptyMessage,
24};
25use arrow::array::{ArrayRef, RecordBatch};
26use arrow::csv::WriterBuilder;
27use arrow::datatypes::{
28    DataType, Field, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, Schema,
29    SchemaRef, TimeUnit, UnionMode,
30};
31use arrow::ipc::writer::{DictionaryTracker, IpcDataGenerator};
32use datafusion_common::{
33    config::{
34        CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
35        TableParquetOptions,
36    },
37    file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
38    parsers::CompressionTypeVariant,
39    plan_datafusion_err,
40    stats::Precision,
41    Column, ColumnStatistics, Constraint, Constraints, DFSchema, DFSchemaRef,
42    DataFusionError, JoinSide, ScalarValue, Statistics,
43};
44
45#[derive(Debug)]
46pub enum Error {
47    General(String),
48
49    InvalidScalarValue(ScalarValue),
50
51    InvalidScalarType(DataType),
52
53    InvalidTimeUnit(TimeUnit),
54
55    NotImplemented(String),
56}
57
58impl std::error::Error for Error {}
59
60impl std::fmt::Display for Error {
61    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
62        match self {
63            Self::General(desc) => write!(f, "General error: {desc}"),
64            Self::InvalidScalarValue(value) => {
65                write!(f, "{value:?} is invalid as a DataFusion scalar value")
66            }
67            Self::InvalidScalarType(data_type) => {
68                write!(f, "{data_type:?} is invalid as a DataFusion scalar type")
69            }
70            Self::InvalidTimeUnit(time_unit) => {
71                write!(
72                    f,
73                    "Only TimeUnit::Microsecond and TimeUnit::Nanosecond are valid time units, found: {time_unit:?}"
74                )
75            }
76            Self::NotImplemented(s) => {
77                write!(f, "Not implemented: {s}")
78            }
79        }
80    }
81}
82
83impl From<Error> for DataFusionError {
84    fn from(e: Error) -> Self {
85        plan_datafusion_err!("{}", e)
86    }
87}
88
89impl TryFrom<&Field> for protobuf::Field {
90    type Error = Error;
91
92    fn try_from(field: &Field) -> Result<Self, Self::Error> {
93        let arrow_type = field.data_type().try_into()?;
94        Ok(Self {
95            name: field.name().to_owned(),
96            arrow_type: Some(Box::new(arrow_type)),
97            nullable: field.is_nullable(),
98            children: Vec::new(),
99            metadata: field.metadata().clone(),
100        })
101    }
102}
103
104impl TryFrom<&DataType> for protobuf::ArrowType {
105    type Error = Error;
106
107    fn try_from(val: &DataType) -> Result<Self, Self::Error> {
108        let arrow_type_enum: ArrowTypeEnum = val.try_into()?;
109        Ok(Self {
110            arrow_type_enum: Some(arrow_type_enum),
111        })
112    }
113}
114
115impl TryFrom<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
116    type Error = Error;
117
118    fn try_from(val: &DataType) -> Result<Self, Self::Error> {
119        let res = match val {
120            DataType::Null => Self::None(EmptyMessage {}),
121            DataType::Boolean => Self::Bool(EmptyMessage {}),
122            DataType::Int8 => Self::Int8(EmptyMessage {}),
123            DataType::Int16 => Self::Int16(EmptyMessage {}),
124            DataType::Int32 => Self::Int32(EmptyMessage {}),
125            DataType::Int64 => Self::Int64(EmptyMessage {}),
126            DataType::UInt8 => Self::Uint8(EmptyMessage {}),
127            DataType::UInt16 => Self::Uint16(EmptyMessage {}),
128            DataType::UInt32 => Self::Uint32(EmptyMessage {}),
129            DataType::UInt64 => Self::Uint64(EmptyMessage {}),
130            DataType::Float16 => Self::Float16(EmptyMessage {}),
131            DataType::Float32 => Self::Float32(EmptyMessage {}),
132            DataType::Float64 => Self::Float64(EmptyMessage {}),
133            DataType::Timestamp(time_unit, timezone) => {
134                Self::Timestamp(protobuf::Timestamp {
135                    time_unit: protobuf::TimeUnit::from(time_unit) as i32,
136                    timezone: timezone.as_deref().unwrap_or("").to_string(),
137                })
138            }
139            DataType::Date32 => Self::Date32(EmptyMessage {}),
140            DataType::Date64 => Self::Date64(EmptyMessage {}),
141            DataType::Time32(time_unit) => {
142                Self::Time32(protobuf::TimeUnit::from(time_unit) as i32)
143            }
144            DataType::Time64(time_unit) => {
145                Self::Time64(protobuf::TimeUnit::from(time_unit) as i32)
146            }
147            DataType::Duration(time_unit) => {
148                Self::Duration(protobuf::TimeUnit::from(time_unit) as i32)
149            }
150            DataType::Interval(interval_unit) => {
151                Self::Interval(protobuf::IntervalUnit::from(interval_unit) as i32)
152            }
153            DataType::Binary => Self::Binary(EmptyMessage {}),
154            DataType::BinaryView => Self::BinaryView(EmptyMessage {}),
155            DataType::FixedSizeBinary(size) => Self::FixedSizeBinary(*size),
156            DataType::LargeBinary => Self::LargeBinary(EmptyMessage {}),
157            DataType::Utf8 => Self::Utf8(EmptyMessage {}),
158            DataType::Utf8View => Self::Utf8View(EmptyMessage {}),
159            DataType::LargeUtf8 => Self::LargeUtf8(EmptyMessage {}),
160            DataType::List(item_type) => Self::List(Box::new(protobuf::List {
161                field_type: Some(Box::new(item_type.as_ref().try_into()?)),
162            })),
163            DataType::FixedSizeList(item_type, size) => {
164                Self::FixedSizeList(Box::new(protobuf::FixedSizeList {
165                    field_type: Some(Box::new(item_type.as_ref().try_into()?)),
166                    list_size: *size,
167                }))
168            }
169            DataType::LargeList(item_type) => Self::LargeList(Box::new(protobuf::List {
170                field_type: Some(Box::new(item_type.as_ref().try_into()?)),
171            })),
172            DataType::Struct(struct_fields) => Self::Struct(protobuf::Struct {
173                sub_field_types: convert_arc_fields_to_proto_fields(struct_fields)?,
174            }),
175            DataType::Union(fields, union_mode) => {
176                let union_mode = match union_mode {
177                    UnionMode::Sparse => protobuf::UnionMode::Sparse,
178                    UnionMode::Dense => protobuf::UnionMode::Dense,
179                };
180                Self::Union(protobuf::Union {
181                    union_types: convert_arc_fields_to_proto_fields(fields.iter().map(|(_, item)|item))?,
182                    union_mode: union_mode.into(),
183                    type_ids: fields.iter().map(|(x, _)| x as i32).collect(),
184                })
185            }
186            DataType::Dictionary(key_type, value_type) => {
187                Self::Dictionary(Box::new(protobuf::Dictionary {
188                    key: Some(Box::new(key_type.as_ref().try_into()?)),
189                    value: Some(Box::new(value_type.as_ref().try_into()?)),
190                }))
191            }
192            DataType::Decimal32(precision, scale) => Self::Decimal32(protobuf::Decimal32Type {
193                precision: *precision as u32,
194                scale: *scale as i32,
195            }),
196            DataType::Decimal64(precision, scale) => Self::Decimal64(protobuf::Decimal64Type {
197                precision: *precision as u32,
198                scale: *scale as i32,
199            }),
200            DataType::Decimal128(precision, scale) => Self::Decimal128(protobuf::Decimal128Type {
201                precision: *precision as u32,
202                scale: *scale as i32,
203            }),
204            DataType::Decimal256(precision, scale) => Self::Decimal256(protobuf::Decimal256Type {
205                precision: *precision as u32,
206                scale: *scale as i32,
207            }),
208            DataType::Map(field, sorted) => {
209                Self::Map(Box::new(
210                    protobuf::Map {
211                        field_type: Some(Box::new(field.as_ref().try_into()?)),
212                        keys_sorted: *sorted,
213                    }
214                ))
215            }
216            DataType::RunEndEncoded(_, _) => {
217                return Err(Error::General(
218                    "Proto serialization error: The RunEndEncoded data type is not yet supported".to_owned()
219                ))
220            }
221            DataType::ListView(_) | DataType::LargeListView(_) => {
222                return Err(Error::General(format!("Proto serialization error: {val} not yet supported")))
223            }
224        };
225
226        Ok(res)
227    }
228}
229
230impl From<Column> for protobuf::Column {
231    fn from(c: Column) -> Self {
232        Self {
233            relation: c.relation.map(|relation| protobuf::ColumnRelation {
234                relation: relation.to_string(),
235            }),
236            name: c.name,
237        }
238    }
239}
240
241impl From<&Column> for protobuf::Column {
242    fn from(c: &Column) -> Self {
243        c.clone().into()
244    }
245}
246
247impl TryFrom<&Schema> for protobuf::Schema {
248    type Error = Error;
249
250    fn try_from(schema: &Schema) -> Result<Self, Self::Error> {
251        Ok(Self {
252            columns: convert_arc_fields_to_proto_fields(schema.fields())?,
253            metadata: schema.metadata.clone(),
254        })
255    }
256}
257
258impl TryFrom<SchemaRef> for protobuf::Schema {
259    type Error = Error;
260
261    fn try_from(schema: SchemaRef) -> Result<Self, Self::Error> {
262        Ok(Self {
263            columns: convert_arc_fields_to_proto_fields(schema.fields())?,
264            metadata: schema.metadata.clone(),
265        })
266    }
267}
268
269impl TryFrom<&DFSchema> for protobuf::DfSchema {
270    type Error = Error;
271
272    fn try_from(s: &DFSchema) -> Result<Self, Self::Error> {
273        let columns = s
274            .iter()
275            .map(|(qualifier, field)| {
276                Ok(protobuf::DfField {
277                    field: Some(field.as_ref().try_into()?),
278                    qualifier: qualifier.map(|r| protobuf::ColumnRelation {
279                        relation: r.to_string(),
280                    }),
281                })
282            })
283            .collect::<Result<Vec<_>, Error>>()?;
284        Ok(Self {
285            columns,
286            metadata: s.metadata().clone(),
287        })
288    }
289}
290
291impl TryFrom<&DFSchemaRef> for protobuf::DfSchema {
292    type Error = Error;
293
294    fn try_from(s: &DFSchemaRef) -> Result<Self, Self::Error> {
295        s.as_ref().try_into()
296    }
297}
298
299impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
300    type Error = Error;
301
302    fn try_from(val: &ScalarValue) -> Result<Self, Self::Error> {
303        let data_type = val.data_type();
304        match val {
305            ScalarValue::Boolean(val) => {
306                create_proto_scalar(val.as_ref(), &data_type, |s| Value::BoolValue(*s))
307            }
308            ScalarValue::Float16(val) => {
309                create_proto_scalar(val.as_ref(), &data_type, |s| {
310                    Value::Float32Value((*s).into())
311                })
312            }
313            ScalarValue::Float32(val) => {
314                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Float32Value(*s))
315            }
316            ScalarValue::Float64(val) => {
317                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Float64Value(*s))
318            }
319            ScalarValue::Int8(val) => {
320                create_proto_scalar(val.as_ref(), &data_type, |s| {
321                    Value::Int8Value(*s as i32)
322                })
323            }
324            ScalarValue::Int16(val) => {
325                create_proto_scalar(val.as_ref(), &data_type, |s| {
326                    Value::Int16Value(*s as i32)
327                })
328            }
329            ScalarValue::Int32(val) => {
330                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Int32Value(*s))
331            }
332            ScalarValue::Int64(val) => {
333                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Int64Value(*s))
334            }
335            ScalarValue::UInt8(val) => {
336                create_proto_scalar(val.as_ref(), &data_type, |s| {
337                    Value::Uint8Value(*s as u32)
338                })
339            }
340            ScalarValue::UInt16(val) => {
341                create_proto_scalar(val.as_ref(), &data_type, |s| {
342                    Value::Uint16Value(*s as u32)
343                })
344            }
345            ScalarValue::UInt32(val) => {
346                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Uint32Value(*s))
347            }
348            ScalarValue::UInt64(val) => {
349                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Uint64Value(*s))
350            }
351            ScalarValue::Utf8(val) => {
352                create_proto_scalar(val.as_ref(), &data_type, |s| {
353                    Value::Utf8Value(s.to_owned())
354                })
355            }
356            ScalarValue::LargeUtf8(val) => {
357                create_proto_scalar(val.as_ref(), &data_type, |s| {
358                    Value::LargeUtf8Value(s.to_owned())
359                })
360            }
361            ScalarValue::Utf8View(val) => {
362                create_proto_scalar(val.as_ref(), &data_type, |s| {
363                    Value::Utf8ViewValue(s.to_owned())
364                })
365            }
366            ScalarValue::List(arr) => {
367                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
368            }
369            ScalarValue::LargeList(arr) => {
370                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
371            }
372            ScalarValue::FixedSizeList(arr) => {
373                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
374            }
375            ScalarValue::Struct(arr) => {
376                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
377            }
378            ScalarValue::Map(arr) => {
379                encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
380            }
381            ScalarValue::Date32(val) => {
382                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Date32Value(*s))
383            }
384            ScalarValue::TimestampMicrosecond(val, tz) => {
385                create_proto_scalar(val.as_ref(), &data_type, |s| {
386                    Value::TimestampValue(protobuf::ScalarTimestampValue {
387                        timezone: tz.as_deref().unwrap_or("").to_string(),
388                        value: Some(
389                            protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(
390                                *s,
391                            ),
392                        ),
393                    })
394                })
395            }
396            ScalarValue::TimestampNanosecond(val, tz) => {
397                create_proto_scalar(val.as_ref(), &data_type, |s| {
398                    Value::TimestampValue(protobuf::ScalarTimestampValue {
399                        timezone: tz.as_deref().unwrap_or("").to_string(),
400                        value: Some(
401                            protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(
402                                *s,
403                            ),
404                        ),
405                    })
406                })
407            }
408            ScalarValue::Decimal128(val, p, s) => match *val {
409                Some(v) => {
410                    let array = v.to_be_bytes();
411                    let vec_val: Vec<u8> = array.to_vec();
412                    Ok(protobuf::ScalarValue {
413                        value: Some(Value::Decimal128Value(protobuf::Decimal128 {
414                            value: vec_val,
415                            p: *p as i64,
416                            s: *s as i64,
417                        })),
418                    })
419                }
420                None => Ok(protobuf::ScalarValue {
421                    value: Some(protobuf::scalar_value::Value::NullValue(
422                        (&data_type).try_into()?,
423                    )),
424                }),
425            },
426            ScalarValue::Decimal256(val, p, s) => match *val {
427                Some(v) => {
428                    let array = v.to_be_bytes();
429                    let vec_val: Vec<u8> = array.to_vec();
430                    Ok(protobuf::ScalarValue {
431                        value: Some(Value::Decimal256Value(protobuf::Decimal256 {
432                            value: vec_val,
433                            p: *p as i64,
434                            s: *s as i64,
435                        })),
436                    })
437                }
438                None => Ok(protobuf::ScalarValue {
439                    value: Some(protobuf::scalar_value::Value::NullValue(
440                        (&data_type).try_into()?,
441                    )),
442                }),
443            },
444            ScalarValue::Date64(val) => {
445                create_proto_scalar(val.as_ref(), &data_type, |s| Value::Date64Value(*s))
446            }
447            ScalarValue::TimestampSecond(val, tz) => {
448                create_proto_scalar(val.as_ref(), &data_type, |s| {
449                    Value::TimestampValue(protobuf::ScalarTimestampValue {
450                        timezone: tz.as_deref().unwrap_or("").to_string(),
451                        value: Some(
452                            protobuf::scalar_timestamp_value::Value::TimeSecondValue(*s),
453                        ),
454                    })
455                })
456            }
457            ScalarValue::TimestampMillisecond(val, tz) => {
458                create_proto_scalar(val.as_ref(), &data_type, |s| {
459                    Value::TimestampValue(protobuf::ScalarTimestampValue {
460                        timezone: tz.as_deref().unwrap_or("").to_string(),
461                        value: Some(
462                            protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(
463                                *s,
464                            ),
465                        ),
466                    })
467                })
468            }
469            ScalarValue::IntervalYearMonth(val) => {
470                create_proto_scalar(val.as_ref(), &data_type, |s| {
471                    Value::IntervalYearmonthValue(*s)
472                })
473            }
474            ScalarValue::Null => Ok(protobuf::ScalarValue {
475                value: Some(Value::NullValue((&data_type).try_into()?)),
476            }),
477
478            ScalarValue::Binary(val) => {
479                create_proto_scalar(val.as_ref(), &data_type, |s| {
480                    Value::BinaryValue(s.to_owned())
481                })
482            }
483            ScalarValue::BinaryView(val) => {
484                create_proto_scalar(val.as_ref(), &data_type, |s| {
485                    Value::BinaryViewValue(s.to_owned())
486                })
487            }
488            ScalarValue::LargeBinary(val) => {
489                create_proto_scalar(val.as_ref(), &data_type, |s| {
490                    Value::LargeBinaryValue(s.to_owned())
491                })
492            }
493            ScalarValue::FixedSizeBinary(length, val) => {
494                create_proto_scalar(val.as_ref(), &data_type, |s| {
495                    Value::FixedSizeBinaryValue(protobuf::ScalarFixedSizeBinary {
496                        values: s.to_owned(),
497                        length: *length,
498                    })
499                })
500            }
501
502            ScalarValue::Time32Second(v) => {
503                create_proto_scalar(v.as_ref(), &data_type, |v| {
504                    Value::Time32Value(protobuf::ScalarTime32Value {
505                        value: Some(
506                            protobuf::scalar_time32_value::Value::Time32SecondValue(*v),
507                        ),
508                    })
509                })
510            }
511
512            ScalarValue::Time32Millisecond(v) => {
513                create_proto_scalar(v.as_ref(), &data_type, |v| {
514                    Value::Time32Value(protobuf::ScalarTime32Value {
515                        value: Some(
516                            protobuf::scalar_time32_value::Value::Time32MillisecondValue(
517                                *v,
518                            ),
519                        ),
520                    })
521                })
522            }
523
524            ScalarValue::Time64Microsecond(v) => {
525                create_proto_scalar(v.as_ref(), &data_type, |v| {
526                    Value::Time64Value(protobuf::ScalarTime64Value {
527                        value: Some(
528                            protobuf::scalar_time64_value::Value::Time64MicrosecondValue(
529                                *v,
530                            ),
531                        ),
532                    })
533                })
534            }
535
536            ScalarValue::Time64Nanosecond(v) => {
537                create_proto_scalar(v.as_ref(), &data_type, |v| {
538                    Value::Time64Value(protobuf::ScalarTime64Value {
539                        value: Some(
540                            protobuf::scalar_time64_value::Value::Time64NanosecondValue(
541                                *v,
542                            ),
543                        ),
544                    })
545                })
546            }
547
548            ScalarValue::IntervalDayTime(val) => {
549                let value = if let Some(v) = val {
550                    let (days, milliseconds) = IntervalDayTimeType::to_parts(*v);
551                    Value::IntervalDaytimeValue(protobuf::IntervalDayTimeValue {
552                        days,
553                        milliseconds,
554                    })
555                } else {
556                    Value::NullValue((&data_type).try_into()?)
557                };
558
559                Ok(protobuf::ScalarValue { value: Some(value) })
560            }
561
562            ScalarValue::IntervalMonthDayNano(v) => {
563                let value = if let Some(v) = v {
564                    let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v);
565                    Value::IntervalMonthDayNano(protobuf::IntervalMonthDayNanoValue {
566                        months,
567                        days,
568                        nanos,
569                    })
570                } else {
571                    Value::NullValue((&data_type).try_into()?)
572                };
573
574                Ok(protobuf::ScalarValue { value: Some(value) })
575            }
576
577            ScalarValue::DurationSecond(v) => {
578                let value = match v {
579                    Some(v) => Value::DurationSecondValue(*v),
580                    None => Value::NullValue((&data_type).try_into()?),
581                };
582                Ok(protobuf::ScalarValue { value: Some(value) })
583            }
584            ScalarValue::DurationMillisecond(v) => {
585                let value = match v {
586                    Some(v) => Value::DurationMillisecondValue(*v),
587                    None => Value::NullValue((&data_type).try_into()?),
588                };
589                Ok(protobuf::ScalarValue { value: Some(value) })
590            }
591            ScalarValue::DurationMicrosecond(v) => {
592                let value = match v {
593                    Some(v) => Value::DurationMicrosecondValue(*v),
594                    None => Value::NullValue((&data_type).try_into()?),
595                };
596                Ok(protobuf::ScalarValue { value: Some(value) })
597            }
598            ScalarValue::DurationNanosecond(v) => {
599                let value = match v {
600                    Some(v) => Value::DurationNanosecondValue(*v),
601                    None => Value::NullValue((&data_type).try_into()?),
602                };
603                Ok(protobuf::ScalarValue { value: Some(value) })
604            }
605
606            ScalarValue::Union(val, df_fields, mode) => {
607                let mut fields =
608                    Vec::<protobuf::UnionField>::with_capacity(df_fields.len());
609                for (id, field) in df_fields.iter() {
610                    let field_id = id as i32;
611                    let field = Some(field.as_ref().try_into()?);
612                    let field = protobuf::UnionField { field_id, field };
613                    fields.push(field);
614                }
615                let mode = match mode {
616                    UnionMode::Sparse => 0,
617                    UnionMode::Dense => 1,
618                };
619                let value = match val {
620                    None => None,
621                    Some((_id, v)) => Some(Box::new(v.as_ref().try_into()?)),
622                };
623                let val = protobuf::UnionValue {
624                    value_id: val.as_ref().map(|(id, _v)| *id as i32).unwrap_or(0),
625                    value,
626                    fields,
627                    mode,
628                };
629                let val = Value::UnionValue(Box::new(val));
630                let val = protobuf::ScalarValue { value: Some(val) };
631                Ok(val)
632            }
633
634            ScalarValue::Dictionary(index_type, val) => {
635                let value: protobuf::ScalarValue = val.as_ref().try_into()?;
636                Ok(protobuf::ScalarValue {
637                    value: Some(Value::DictionaryValue(Box::new(
638                        protobuf::ScalarDictionaryValue {
639                            index_type: Some(index_type.as_ref().try_into()?),
640                            value: Some(Box::new(value)),
641                        },
642                    ))),
643                })
644            }
645        }
646    }
647}
648
649impl From<&TimeUnit> for protobuf::TimeUnit {
650    fn from(val: &TimeUnit) -> Self {
651        match val {
652            TimeUnit::Second => protobuf::TimeUnit::Second,
653            TimeUnit::Millisecond => protobuf::TimeUnit::Millisecond,
654            TimeUnit::Microsecond => protobuf::TimeUnit::Microsecond,
655            TimeUnit::Nanosecond => protobuf::TimeUnit::Nanosecond,
656        }
657    }
658}
659
660impl From<&IntervalUnit> for protobuf::IntervalUnit {
661    fn from(interval_unit: &IntervalUnit) -> Self {
662        match interval_unit {
663            IntervalUnit::YearMonth => protobuf::IntervalUnit::YearMonth,
664            IntervalUnit::DayTime => protobuf::IntervalUnit::DayTime,
665            IntervalUnit::MonthDayNano => protobuf::IntervalUnit::MonthDayNano,
666        }
667    }
668}
669
670impl From<Constraints> for protobuf::Constraints {
671    fn from(value: Constraints) -> Self {
672        let constraints = value.into_iter().map(|item| item.into()).collect();
673        protobuf::Constraints { constraints }
674    }
675}
676
677impl From<Constraint> for protobuf::Constraint {
678    fn from(value: Constraint) -> Self {
679        let res = match value {
680            Constraint::PrimaryKey(indices) => {
681                let indices = indices.into_iter().map(|item| item as u64).collect();
682                protobuf::constraint::ConstraintMode::PrimaryKey(
683                    protobuf::PrimaryKeyConstraint { indices },
684                )
685            }
686            Constraint::Unique(indices) => {
687                let indices = indices.into_iter().map(|item| item as u64).collect();
688                protobuf::constraint::ConstraintMode::PrimaryKey(
689                    protobuf::PrimaryKeyConstraint { indices },
690                )
691            }
692        };
693        protobuf::Constraint {
694            constraint_mode: Some(res),
695        }
696    }
697}
698
699impl From<&Precision<usize>> for protobuf::Precision {
700    fn from(s: &Precision<usize>) -> protobuf::Precision {
701        match s {
702            Precision::Exact(val) => protobuf::Precision {
703                precision_info: protobuf::PrecisionInfo::Exact.into(),
704                val: Some(crate::protobuf_common::ScalarValue {
705                    value: Some(Value::Uint64Value(*val as u64)),
706                }),
707            },
708            Precision::Inexact(val) => protobuf::Precision {
709                precision_info: protobuf::PrecisionInfo::Inexact.into(),
710                val: Some(crate::protobuf_common::ScalarValue {
711                    value: Some(Value::Uint64Value(*val as u64)),
712                }),
713            },
714            Precision::Absent => protobuf::Precision {
715                precision_info: protobuf::PrecisionInfo::Absent.into(),
716                val: Some(crate::protobuf_common::ScalarValue { value: None }),
717            },
718        }
719    }
720}
721
722impl From<&Precision<datafusion_common::ScalarValue>> for protobuf::Precision {
723    fn from(s: &Precision<datafusion_common::ScalarValue>) -> protobuf::Precision {
724        match s {
725            Precision::Exact(val) => protobuf::Precision {
726                precision_info: protobuf::PrecisionInfo::Exact.into(),
727                val: val.try_into().ok(),
728            },
729            Precision::Inexact(val) => protobuf::Precision {
730                precision_info: protobuf::PrecisionInfo::Inexact.into(),
731                val: val.try_into().ok(),
732            },
733            Precision::Absent => protobuf::Precision {
734                precision_info: protobuf::PrecisionInfo::Absent.into(),
735                val: Some(crate::protobuf_common::ScalarValue { value: None }),
736            },
737        }
738    }
739}
740
741impl From<&Statistics> for protobuf::Statistics {
742    fn from(s: &Statistics) -> protobuf::Statistics {
743        let column_stats = s.column_statistics.iter().map(|s| s.into()).collect();
744        protobuf::Statistics {
745            num_rows: Some(protobuf::Precision::from(&s.num_rows)),
746            total_byte_size: Some(protobuf::Precision::from(&s.total_byte_size)),
747            column_stats,
748        }
749    }
750}
751
752impl From<&ColumnStatistics> for protobuf::ColumnStats {
753    fn from(s: &ColumnStatistics) -> protobuf::ColumnStats {
754        protobuf::ColumnStats {
755            min_value: Some(protobuf::Precision::from(&s.min_value)),
756            max_value: Some(protobuf::Precision::from(&s.max_value)),
757            sum_value: Some(protobuf::Precision::from(&s.sum_value)),
758            null_count: Some(protobuf::Precision::from(&s.null_count)),
759            distinct_count: Some(protobuf::Precision::from(&s.distinct_count)),
760        }
761    }
762}
763
764impl From<JoinSide> for protobuf::JoinSide {
765    fn from(t: JoinSide) -> Self {
766        match t {
767            JoinSide::Left => protobuf::JoinSide::LeftSide,
768            JoinSide::Right => protobuf::JoinSide::RightSide,
769            JoinSide::None => protobuf::JoinSide::None,
770        }
771    }
772}
773
774impl From<&CompressionTypeVariant> for protobuf::CompressionTypeVariant {
775    fn from(value: &CompressionTypeVariant) -> Self {
776        match value {
777            CompressionTypeVariant::GZIP => Self::Gzip,
778            CompressionTypeVariant::BZIP2 => Self::Bzip2,
779            CompressionTypeVariant::XZ => Self::Xz,
780            CompressionTypeVariant::ZSTD => Self::Zstd,
781            CompressionTypeVariant::UNCOMPRESSED => Self::Uncompressed,
782        }
783    }
784}
785
786impl TryFrom<&CsvWriterOptions> for protobuf::CsvWriterOptions {
787    type Error = DataFusionError;
788
789    fn try_from(opts: &CsvWriterOptions) -> datafusion_common::Result<Self, Self::Error> {
790        Ok(csv_writer_options_to_proto(
791            &opts.writer_options,
792            &opts.compression,
793        ))
794    }
795}
796
797impl TryFrom<&JsonWriterOptions> for protobuf::JsonWriterOptions {
798    type Error = DataFusionError;
799
800    fn try_from(
801        opts: &JsonWriterOptions,
802    ) -> datafusion_common::Result<Self, Self::Error> {
803        let compression: protobuf::CompressionTypeVariant = opts.compression.into();
804        Ok(protobuf::JsonWriterOptions {
805            compression: compression.into(),
806        })
807    }
808}
809
810impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
811    type Error = DataFusionError;
812
813    fn try_from(value: &ParquetOptions) -> datafusion_common::Result<Self, Self::Error> {
814        Ok(protobuf::ParquetOptions {
815            enable_page_index: value.enable_page_index,
816            pruning: value.pruning,
817            skip_metadata: value.skip_metadata,
818            metadata_size_hint_opt: value.metadata_size_hint.map(|v| protobuf::parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v as u64)),
819            pushdown_filters: value.pushdown_filters,
820            reorder_filters: value.reorder_filters,
821            data_pagesize_limit: value.data_pagesize_limit as u64,
822            write_batch_size: value.write_batch_size as u64,
823            writer_version: value.writer_version.clone(),
824            compression_opt: value.compression.clone().map(protobuf::parquet_options::CompressionOpt::Compression),
825            dictionary_enabled_opt: value.dictionary_enabled.map(protobuf::parquet_options::DictionaryEnabledOpt::DictionaryEnabled),
826            dictionary_page_size_limit: value.dictionary_page_size_limit as u64,
827            statistics_enabled_opt: value.statistics_enabled.clone().map(protobuf::parquet_options::StatisticsEnabledOpt::StatisticsEnabled),
828            max_row_group_size: value.max_row_group_size as u64,
829            created_by: value.created_by.clone(),
830            column_index_truncate_length_opt: value.column_index_truncate_length.map(|v| protobuf::parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v as u64)),
831            statistics_truncate_length_opt: value.statistics_truncate_length.map(|v| protobuf::parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v as u64)),
832            data_page_row_count_limit: value.data_page_row_count_limit as u64,
833            encoding_opt: value.encoding.clone().map(protobuf::parquet_options::EncodingOpt::Encoding),
834            bloom_filter_on_read: value.bloom_filter_on_read,
835            bloom_filter_on_write: value.bloom_filter_on_write,
836            bloom_filter_fpp_opt: value.bloom_filter_fpp.map(protobuf::parquet_options::BloomFilterFppOpt::BloomFilterFpp),
837            bloom_filter_ndv_opt: value.bloom_filter_ndv.map(protobuf::parquet_options::BloomFilterNdvOpt::BloomFilterNdv),
838            allow_single_file_parallelism: value.allow_single_file_parallelism,
839            maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as u64,
840            maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as u64,
841            schema_force_view_types: value.schema_force_view_types,
842            binary_as_string: value.binary_as_string,
843            skip_arrow_metadata: value.skip_arrow_metadata,
844            coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96),
845        })
846    }
847}
848
849impl TryFrom<&ParquetColumnOptions> for protobuf::ParquetColumnOptions {
850    type Error = DataFusionError;
851
852    fn try_from(
853        value: &ParquetColumnOptions,
854    ) -> datafusion_common::Result<Self, Self::Error> {
855        Ok(protobuf::ParquetColumnOptions {
856            compression_opt: value
857                .compression
858                .clone()
859                .map(protobuf::parquet_column_options::CompressionOpt::Compression),
860            dictionary_enabled_opt: value
861                .dictionary_enabled
862                .map(protobuf::parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled),
863            statistics_enabled_opt: value
864                .statistics_enabled
865                .clone()
866                .map(protobuf::parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled),
867            encoding_opt: value
868                .encoding
869                .clone()
870                .map(protobuf::parquet_column_options::EncodingOpt::Encoding),
871            bloom_filter_enabled_opt: value
872                .bloom_filter_enabled
873                .map(protobuf::parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled),
874            bloom_filter_fpp_opt: value
875                .bloom_filter_fpp
876                .map(protobuf::parquet_column_options::BloomFilterFppOpt::BloomFilterFpp),
877            bloom_filter_ndv_opt: value
878                .bloom_filter_ndv
879                .map(protobuf::parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv),
880        })
881    }
882}
883
884impl TryFrom<&TableParquetOptions> for protobuf::TableParquetOptions {
885    type Error = DataFusionError;
886    fn try_from(
887        value: &TableParquetOptions,
888    ) -> datafusion_common::Result<Self, Self::Error> {
889        let column_specific_options = value
890            .column_specific_options
891            .iter()
892            .map(|(k, v)| {
893                Ok(protobuf::ParquetColumnSpecificOptions {
894                    column_name: k.into(),
895                    options: Some(v.try_into()?),
896                })
897            })
898            .collect::<datafusion_common::Result<Vec<_>>>()?;
899        let key_value_metadata = value
900            .key_value_metadata
901            .iter()
902            .filter_map(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone())))
903            .collect::<HashMap<String, String>>();
904        Ok(protobuf::TableParquetOptions {
905            global: Some((&value.global).try_into()?),
906            column_specific_options,
907            key_value_metadata,
908        })
909    }
910}
911
912impl TryFrom<&CsvOptions> for protobuf::CsvOptions {
913    type Error = DataFusionError; // Define or use an appropriate error type
914
915    fn try_from(opts: &CsvOptions) -> datafusion_common::Result<Self, Self::Error> {
916        let compression: protobuf::CompressionTypeVariant = opts.compression.into();
917        Ok(protobuf::CsvOptions {
918            has_header: opts.has_header.map_or_else(Vec::new, |h| vec![h as u8]),
919            delimiter: vec![opts.delimiter],
920            quote: vec![opts.quote],
921            terminator: opts.terminator.map_or_else(Vec::new, |e| vec![e]),
922            escape: opts.escape.map_or_else(Vec::new, |e| vec![e]),
923            double_quote: opts.double_quote.map_or_else(Vec::new, |h| vec![h as u8]),
924            newlines_in_values: opts
925                .newlines_in_values
926                .map_or_else(Vec::new, |h| vec![h as u8]),
927            compression: compression.into(),
928            schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
929            date_format: opts.date_format.clone().unwrap_or_default(),
930            datetime_format: opts.datetime_format.clone().unwrap_or_default(),
931            timestamp_format: opts.timestamp_format.clone().unwrap_or_default(),
932            timestamp_tz_format: opts.timestamp_tz_format.clone().unwrap_or_default(),
933            time_format: opts.time_format.clone().unwrap_or_default(),
934            null_value: opts.null_value.clone().unwrap_or_default(),
935            null_regex: opts.null_regex.clone().unwrap_or_default(),
936            comment: opts.comment.map_or_else(Vec::new, |h| vec![h]),
937        })
938    }
939}
940
941impl TryFrom<&JsonOptions> for protobuf::JsonOptions {
942    type Error = DataFusionError;
943
944    fn try_from(opts: &JsonOptions) -> datafusion_common::Result<Self, Self::Error> {
945        let compression: protobuf::CompressionTypeVariant = opts.compression.into();
946        Ok(protobuf::JsonOptions {
947            compression: compression.into(),
948            schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
949        })
950    }
951}
952
953/// Creates a scalar protobuf value from an optional value (T), and
954/// encoding None as the appropriate datatype
955fn create_proto_scalar<I, T: FnOnce(&I) -> protobuf::scalar_value::Value>(
956    v: Option<&I>,
957    null_arrow_type: &DataType,
958    constructor: T,
959) -> Result<protobuf::ScalarValue, Error> {
960    let value = v
961        .map(constructor)
962        .unwrap_or(protobuf::scalar_value::Value::NullValue(
963            null_arrow_type.try_into()?,
964        ));
965
966    Ok(protobuf::ScalarValue { value: Some(value) })
967}
968
969// ScalarValue::List / FixedSizeList / LargeList / Struct / Map are serialized using
970// Arrow IPC messages as a single column RecordBatch
971fn encode_scalar_nested_value(
972    arr: ArrayRef,
973    val: &ScalarValue,
974) -> Result<protobuf::ScalarValue, Error> {
975    let batch = RecordBatch::try_from_iter(vec![("field_name", arr)]).map_err(|e| {
976        Error::General(format!(
977            "Error creating temporary batch while encoding ScalarValue::List: {e}"
978        ))
979    })?;
980
981    let gen = IpcDataGenerator {};
982    let mut dict_tracker = DictionaryTracker::new(false);
983    let (encoded_dictionaries, encoded_message) = gen
984        .encoded_batch(&batch, &mut dict_tracker, &Default::default())
985        .map_err(|e| {
986            Error::General(format!("Error encoding ScalarValue::List as IPC: {e}"))
987        })?;
988
989    let schema: protobuf::Schema = batch.schema().try_into()?;
990
991    let scalar_list_value = protobuf::ScalarNestedValue {
992        ipc_message: encoded_message.ipc_message,
993        arrow_data: encoded_message.arrow_data,
994        dictionaries: encoded_dictionaries
995            .into_iter()
996            .map(|data| protobuf::scalar_nested_value::Dictionary {
997                ipc_message: data.ipc_message,
998                arrow_data: data.arrow_data,
999            })
1000            .collect(),
1001        schema: Some(schema),
1002    };
1003
1004    match val {
1005        ScalarValue::List(_) => Ok(protobuf::ScalarValue {
1006            value: Some(protobuf::scalar_value::Value::ListValue(scalar_list_value)),
1007        }),
1008        ScalarValue::LargeList(_) => Ok(protobuf::ScalarValue {
1009            value: Some(protobuf::scalar_value::Value::LargeListValue(
1010                scalar_list_value,
1011            )),
1012        }),
1013        ScalarValue::FixedSizeList(_) => Ok(protobuf::ScalarValue {
1014            value: Some(protobuf::scalar_value::Value::FixedSizeListValue(
1015                scalar_list_value,
1016            )),
1017        }),
1018        ScalarValue::Struct(_) => Ok(protobuf::ScalarValue {
1019            value: Some(protobuf::scalar_value::Value::StructValue(
1020                scalar_list_value,
1021            )),
1022        }),
1023        ScalarValue::Map(_) => Ok(protobuf::ScalarValue {
1024            value: Some(protobuf::scalar_value::Value::MapValue(scalar_list_value)),
1025        }),
1026        _ => unreachable!(),
1027    }
1028}
1029
1030/// Converts a vector of `Arc<arrow::Field>`s to `protobuf::Field`s
1031fn convert_arc_fields_to_proto_fields<'a, I>(
1032    fields: I,
1033) -> Result<Vec<protobuf::Field>, Error>
1034where
1035    I: IntoIterator<Item = &'a Arc<Field>>,
1036{
1037    fields
1038        .into_iter()
1039        .map(|field| field.as_ref().try_into())
1040        .collect::<Result<Vec<_>, Error>>()
1041}
1042
1043pub(crate) fn csv_writer_options_to_proto(
1044    csv_options: &WriterBuilder,
1045    compression: &CompressionTypeVariant,
1046) -> protobuf::CsvWriterOptions {
1047    let compression: protobuf::CompressionTypeVariant = compression.into();
1048    protobuf::CsvWriterOptions {
1049        compression: compression.into(),
1050        delimiter: (csv_options.delimiter() as char).to_string(),
1051        has_header: csv_options.header(),
1052        date_format: csv_options.date_format().unwrap_or("").to_owned(),
1053        datetime_format: csv_options.datetime_format().unwrap_or("").to_owned(),
1054        timestamp_format: csv_options.timestamp_format().unwrap_or("").to_owned(),
1055        time_format: csv_options.time_format().unwrap_or("").to_owned(),
1056        null_value: csv_options.null().to_owned(),
1057        quote: (csv_options.quote() as char).to_string(),
1058        escape: (csv_options.escape() as char).to_string(),
1059        double_quote: csv_options.double_quote(),
1060    }
1061}