1use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::protobuf_common as protobuf;
22use crate::protobuf_common::{
23 EmptyMessage, arrow_type::ArrowTypeEnum, scalar_value::Value,
24};
25use arrow::array::{ArrayRef, RecordBatch};
26use arrow::csv::WriterBuilder;
27use arrow::datatypes::{
28 DataType, Field, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, Schema,
29 SchemaRef, TimeUnit, UnionMode,
30};
31use arrow::ipc::writer::{
32 CompressionContext, DictionaryTracker, IpcDataGenerator, IpcWriteOptions,
33};
34use datafusion_common::{
35 Column, ColumnStatistics, Constraint, Constraints, DFSchema, DFSchemaRef,
36 DataFusionError, JoinSide, ScalarValue, Statistics,
37 config::{
38 CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
39 TableParquetOptions,
40 },
41 file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
42 parsers::CompressionTypeVariant,
43 plan_datafusion_err,
44 stats::Precision,
45};
46
47#[derive(Debug)]
48pub enum Error {
49 General(String),
50
51 InvalidScalarValue(ScalarValue),
52
53 InvalidScalarType(DataType),
54
55 InvalidTimeUnit(TimeUnit),
56
57 NotImplemented(String),
58}
59
60impl std::error::Error for Error {}
61
62impl std::fmt::Display for Error {
63 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
64 match self {
65 Self::General(desc) => write!(f, "General error: {desc}"),
66 Self::InvalidScalarValue(value) => {
67 write!(f, "{value:?} is invalid as a DataFusion scalar value")
68 }
69 Self::InvalidScalarType(data_type) => {
70 write!(f, "{data_type} is invalid as a DataFusion scalar type")
71 }
72 Self::InvalidTimeUnit(time_unit) => {
73 write!(
74 f,
75 "Only TimeUnit::Microsecond and TimeUnit::Nanosecond are valid time units, found: {time_unit:?}"
76 )
77 }
78 Self::NotImplemented(s) => {
79 write!(f, "Not implemented: {s}")
80 }
81 }
82 }
83}
84
85impl From<Error> for DataFusionError {
86 fn from(e: Error) -> Self {
87 plan_datafusion_err!("{}", e)
88 }
89}
90
91impl TryFrom<&Field> for protobuf::Field {
92 type Error = Error;
93
94 fn try_from(field: &Field) -> Result<Self, Self::Error> {
95 let arrow_type = field.data_type().try_into()?;
96 Ok(Self {
97 name: field.name().to_owned(),
98 arrow_type: Some(Box::new(arrow_type)),
99 nullable: field.is_nullable(),
100 children: Vec::new(),
101 metadata: field.metadata().clone(),
102 })
103 }
104}
105
106impl TryFrom<&DataType> for protobuf::ArrowType {
107 type Error = Error;
108
109 fn try_from(val: &DataType) -> Result<Self, Self::Error> {
110 let arrow_type_enum: ArrowTypeEnum = val.try_into()?;
111 Ok(Self {
112 arrow_type_enum: Some(arrow_type_enum),
113 })
114 }
115}
116
117impl TryFrom<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
118 type Error = Error;
119
120 fn try_from(val: &DataType) -> Result<Self, Self::Error> {
121 let res = match val {
122 DataType::Null => Self::None(EmptyMessage {}),
123 DataType::Boolean => Self::Bool(EmptyMessage {}),
124 DataType::Int8 => Self::Int8(EmptyMessage {}),
125 DataType::Int16 => Self::Int16(EmptyMessage {}),
126 DataType::Int32 => Self::Int32(EmptyMessage {}),
127 DataType::Int64 => Self::Int64(EmptyMessage {}),
128 DataType::UInt8 => Self::Uint8(EmptyMessage {}),
129 DataType::UInt16 => Self::Uint16(EmptyMessage {}),
130 DataType::UInt32 => Self::Uint32(EmptyMessage {}),
131 DataType::UInt64 => Self::Uint64(EmptyMessage {}),
132 DataType::Float16 => Self::Float16(EmptyMessage {}),
133 DataType::Float32 => Self::Float32(EmptyMessage {}),
134 DataType::Float64 => Self::Float64(EmptyMessage {}),
135 DataType::Timestamp(time_unit, timezone) => {
136 Self::Timestamp(protobuf::Timestamp {
137 time_unit: protobuf::TimeUnit::from(time_unit) as i32,
138 timezone: timezone.as_deref().unwrap_or("").to_string(),
139 })
140 }
141 DataType::Date32 => Self::Date32(EmptyMessage {}),
142 DataType::Date64 => Self::Date64(EmptyMessage {}),
143 DataType::Time32(time_unit) => {
144 Self::Time32(protobuf::TimeUnit::from(time_unit) as i32)
145 }
146 DataType::Time64(time_unit) => {
147 Self::Time64(protobuf::TimeUnit::from(time_unit) as i32)
148 }
149 DataType::Duration(time_unit) => {
150 Self::Duration(protobuf::TimeUnit::from(time_unit) as i32)
151 }
152 DataType::Interval(interval_unit) => {
153 Self::Interval(protobuf::IntervalUnit::from(interval_unit) as i32)
154 }
155 DataType::Binary => Self::Binary(EmptyMessage {}),
156 DataType::BinaryView => Self::BinaryView(EmptyMessage {}),
157 DataType::FixedSizeBinary(size) => Self::FixedSizeBinary(*size),
158 DataType::LargeBinary => Self::LargeBinary(EmptyMessage {}),
159 DataType::Utf8 => Self::Utf8(EmptyMessage {}),
160 DataType::Utf8View => Self::Utf8View(EmptyMessage {}),
161 DataType::LargeUtf8 => Self::LargeUtf8(EmptyMessage {}),
162 DataType::List(item_type) => Self::List(Box::new(protobuf::List {
163 field_type: Some(Box::new(item_type.as_ref().try_into()?)),
164 })),
165 DataType::FixedSizeList(item_type, size) => {
166 Self::FixedSizeList(Box::new(protobuf::FixedSizeList {
167 field_type: Some(Box::new(item_type.as_ref().try_into()?)),
168 list_size: *size,
169 }))
170 }
171 DataType::LargeList(item_type) => Self::LargeList(Box::new(protobuf::List {
172 field_type: Some(Box::new(item_type.as_ref().try_into()?)),
173 })),
174 DataType::Struct(struct_fields) => Self::Struct(protobuf::Struct {
175 sub_field_types: convert_arc_fields_to_proto_fields(struct_fields)?,
176 }),
177 DataType::Union(fields, union_mode) => {
178 let union_mode = match union_mode {
179 UnionMode::Sparse => protobuf::UnionMode::Sparse,
180 UnionMode::Dense => protobuf::UnionMode::Dense,
181 };
182 Self::Union(protobuf::Union {
183 union_types: convert_arc_fields_to_proto_fields(fields.iter().map(|(_, item)|item))?,
184 union_mode: union_mode.into(),
185 type_ids: fields.iter().map(|(x, _)| x as i32).collect(),
186 })
187 }
188 DataType::Dictionary(key_type, value_type) => {
189 Self::Dictionary(Box::new(protobuf::Dictionary {
190 key: Some(Box::new(key_type.as_ref().try_into()?)),
191 value: Some(Box::new(value_type.as_ref().try_into()?)),
192 }))
193 }
194 DataType::Decimal32(precision, scale) => Self::Decimal32(protobuf::Decimal32Type {
195 precision: *precision as u32,
196 scale: *scale as i32,
197 }),
198 DataType::Decimal64(precision, scale) => Self::Decimal64(protobuf::Decimal64Type {
199 precision: *precision as u32,
200 scale: *scale as i32,
201 }),
202 DataType::Decimal128(precision, scale) => Self::Decimal128(protobuf::Decimal128Type {
203 precision: *precision as u32,
204 scale: *scale as i32,
205 }),
206 DataType::Decimal256(precision, scale) => Self::Decimal256(protobuf::Decimal256Type {
207 precision: *precision as u32,
208 scale: *scale as i32,
209 }),
210 DataType::Map(field, sorted) => {
211 Self::Map(Box::new(
212 protobuf::Map {
213 field_type: Some(Box::new(field.as_ref().try_into()?)),
214 keys_sorted: *sorted,
215 }
216 ))
217 }
218 DataType::RunEndEncoded(_, _) => {
219 return Err(Error::General(
220 "Proto serialization error: The RunEndEncoded data type is not yet supported".to_owned()
221 ))
222 }
223 DataType::ListView(_) | DataType::LargeListView(_) => {
224 return Err(Error::General(format!("Proto serialization error: {val} not yet supported")))
225 }
226 };
227
228 Ok(res)
229 }
230}
231
232impl From<Column> for protobuf::Column {
233 fn from(c: Column) -> Self {
234 Self {
235 relation: c.relation.map(|relation| protobuf::ColumnRelation {
236 relation: relation.to_string(),
237 }),
238 name: c.name,
239 }
240 }
241}
242
243impl From<&Column> for protobuf::Column {
244 fn from(c: &Column) -> Self {
245 c.clone().into()
246 }
247}
248
249impl TryFrom<&Schema> for protobuf::Schema {
250 type Error = Error;
251
252 fn try_from(schema: &Schema) -> Result<Self, Self::Error> {
253 Ok(Self {
254 columns: convert_arc_fields_to_proto_fields(schema.fields())?,
255 metadata: schema.metadata.clone(),
256 })
257 }
258}
259
260impl TryFrom<SchemaRef> for protobuf::Schema {
261 type Error = Error;
262
263 fn try_from(schema: SchemaRef) -> Result<Self, Self::Error> {
264 Ok(Self {
265 columns: convert_arc_fields_to_proto_fields(schema.fields())?,
266 metadata: schema.metadata.clone(),
267 })
268 }
269}
270
271impl TryFrom<&DFSchema> for protobuf::DfSchema {
272 type Error = Error;
273
274 fn try_from(s: &DFSchema) -> Result<Self, Self::Error> {
275 let columns = s
276 .iter()
277 .map(|(qualifier, field)| {
278 Ok(protobuf::DfField {
279 field: Some(field.as_ref().try_into()?),
280 qualifier: qualifier.map(|r| protobuf::ColumnRelation {
281 relation: r.to_string(),
282 }),
283 })
284 })
285 .collect::<Result<Vec<_>, Error>>()?;
286 Ok(Self {
287 columns,
288 metadata: s.metadata().clone(),
289 })
290 }
291}
292
293impl TryFrom<&DFSchemaRef> for protobuf::DfSchema {
294 type Error = Error;
295
296 fn try_from(s: &DFSchemaRef) -> Result<Self, Self::Error> {
297 s.as_ref().try_into()
298 }
299}
300
301impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
302 type Error = Error;
303
304 fn try_from(val: &ScalarValue) -> Result<Self, Self::Error> {
305 let data_type = val.data_type();
306 match val {
307 ScalarValue::Boolean(val) => {
308 create_proto_scalar(val.as_ref(), &data_type, |s| Value::BoolValue(*s))
309 }
310 ScalarValue::Float16(val) => {
311 create_proto_scalar(val.as_ref(), &data_type, |s| {
312 Value::Float32Value((*s).into())
313 })
314 }
315 ScalarValue::Float32(val) => {
316 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Float32Value(*s))
317 }
318 ScalarValue::Float64(val) => {
319 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Float64Value(*s))
320 }
321 ScalarValue::Int8(val) => {
322 create_proto_scalar(val.as_ref(), &data_type, |s| {
323 Value::Int8Value(*s as i32)
324 })
325 }
326 ScalarValue::Int16(val) => {
327 create_proto_scalar(val.as_ref(), &data_type, |s| {
328 Value::Int16Value(*s as i32)
329 })
330 }
331 ScalarValue::Int32(val) => {
332 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Int32Value(*s))
333 }
334 ScalarValue::Int64(val) => {
335 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Int64Value(*s))
336 }
337 ScalarValue::UInt8(val) => {
338 create_proto_scalar(val.as_ref(), &data_type, |s| {
339 Value::Uint8Value(*s as u32)
340 })
341 }
342 ScalarValue::UInt16(val) => {
343 create_proto_scalar(val.as_ref(), &data_type, |s| {
344 Value::Uint16Value(*s as u32)
345 })
346 }
347 ScalarValue::UInt32(val) => {
348 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Uint32Value(*s))
349 }
350 ScalarValue::UInt64(val) => {
351 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Uint64Value(*s))
352 }
353 ScalarValue::Utf8(val) => {
354 create_proto_scalar(val.as_ref(), &data_type, |s| {
355 Value::Utf8Value(s.to_owned())
356 })
357 }
358 ScalarValue::LargeUtf8(val) => {
359 create_proto_scalar(val.as_ref(), &data_type, |s| {
360 Value::LargeUtf8Value(s.to_owned())
361 })
362 }
363 ScalarValue::Utf8View(val) => {
364 create_proto_scalar(val.as_ref(), &data_type, |s| {
365 Value::Utf8ViewValue(s.to_owned())
366 })
367 }
368 ScalarValue::List(arr) => {
369 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
370 }
371 ScalarValue::LargeList(arr) => {
372 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
373 }
374 ScalarValue::FixedSizeList(arr) => {
375 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
376 }
377 ScalarValue::Struct(arr) => {
378 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
379 }
380 ScalarValue::Map(arr) => {
381 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
382 }
383 ScalarValue::Date32(val) => {
384 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Date32Value(*s))
385 }
386 ScalarValue::TimestampMicrosecond(val, tz) => {
387 create_proto_scalar(val.as_ref(), &data_type, |s| {
388 Value::TimestampValue(protobuf::ScalarTimestampValue {
389 timezone: tz.as_deref().unwrap_or("").to_string(),
390 value: Some(
391 protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(
392 *s,
393 ),
394 ),
395 })
396 })
397 }
398 ScalarValue::TimestampNanosecond(val, tz) => {
399 create_proto_scalar(val.as_ref(), &data_type, |s| {
400 Value::TimestampValue(protobuf::ScalarTimestampValue {
401 timezone: tz.as_deref().unwrap_or("").to_string(),
402 value: Some(
403 protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(
404 *s,
405 ),
406 ),
407 })
408 })
409 }
410 ScalarValue::Decimal32(val, p, s) => match *val {
411 Some(v) => {
412 let array = v.to_be_bytes();
413 let vec_val: Vec<u8> = array.to_vec();
414 Ok(protobuf::ScalarValue {
415 value: Some(Value::Decimal32Value(protobuf::Decimal32 {
416 value: vec_val,
417 p: *p as i64,
418 s: *s as i64,
419 })),
420 })
421 }
422 None => Ok(protobuf::ScalarValue {
423 value: Some(protobuf::scalar_value::Value::NullValue(
424 (&data_type).try_into()?,
425 )),
426 }),
427 },
428 ScalarValue::Decimal64(val, p, s) => match *val {
429 Some(v) => {
430 let array = v.to_be_bytes();
431 let vec_val: Vec<u8> = array.to_vec();
432 Ok(protobuf::ScalarValue {
433 value: Some(Value::Decimal64Value(protobuf::Decimal64 {
434 value: vec_val,
435 p: *p as i64,
436 s: *s as i64,
437 })),
438 })
439 }
440 None => Ok(protobuf::ScalarValue {
441 value: Some(protobuf::scalar_value::Value::NullValue(
442 (&data_type).try_into()?,
443 )),
444 }),
445 },
446 ScalarValue::Decimal128(val, p, s) => match *val {
447 Some(v) => {
448 let array = v.to_be_bytes();
449 let vec_val: Vec<u8> = array.to_vec();
450 Ok(protobuf::ScalarValue {
451 value: Some(Value::Decimal128Value(protobuf::Decimal128 {
452 value: vec_val,
453 p: *p as i64,
454 s: *s as i64,
455 })),
456 })
457 }
458 None => Ok(protobuf::ScalarValue {
459 value: Some(protobuf::scalar_value::Value::NullValue(
460 (&data_type).try_into()?,
461 )),
462 }),
463 },
464 ScalarValue::Decimal256(val, p, s) => match *val {
465 Some(v) => {
466 let array = v.to_be_bytes();
467 let vec_val: Vec<u8> = array.to_vec();
468 Ok(protobuf::ScalarValue {
469 value: Some(Value::Decimal256Value(protobuf::Decimal256 {
470 value: vec_val,
471 p: *p as i64,
472 s: *s as i64,
473 })),
474 })
475 }
476 None => Ok(protobuf::ScalarValue {
477 value: Some(protobuf::scalar_value::Value::NullValue(
478 (&data_type).try_into()?,
479 )),
480 }),
481 },
482 ScalarValue::Date64(val) => {
483 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Date64Value(*s))
484 }
485 ScalarValue::TimestampSecond(val, tz) => {
486 create_proto_scalar(val.as_ref(), &data_type, |s| {
487 Value::TimestampValue(protobuf::ScalarTimestampValue {
488 timezone: tz.as_deref().unwrap_or("").to_string(),
489 value: Some(
490 protobuf::scalar_timestamp_value::Value::TimeSecondValue(*s),
491 ),
492 })
493 })
494 }
495 ScalarValue::TimestampMillisecond(val, tz) => {
496 create_proto_scalar(val.as_ref(), &data_type, |s| {
497 Value::TimestampValue(protobuf::ScalarTimestampValue {
498 timezone: tz.as_deref().unwrap_or("").to_string(),
499 value: Some(
500 protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(
501 *s,
502 ),
503 ),
504 })
505 })
506 }
507 ScalarValue::IntervalYearMonth(val) => {
508 create_proto_scalar(val.as_ref(), &data_type, |s| {
509 Value::IntervalYearmonthValue(*s)
510 })
511 }
512 ScalarValue::Null => Ok(protobuf::ScalarValue {
513 value: Some(Value::NullValue((&data_type).try_into()?)),
514 }),
515
516 ScalarValue::Binary(val) => {
517 create_proto_scalar(val.as_ref(), &data_type, |s| {
518 Value::BinaryValue(s.to_owned())
519 })
520 }
521 ScalarValue::BinaryView(val) => {
522 create_proto_scalar(val.as_ref(), &data_type, |s| {
523 Value::BinaryViewValue(s.to_owned())
524 })
525 }
526 ScalarValue::LargeBinary(val) => {
527 create_proto_scalar(val.as_ref(), &data_type, |s| {
528 Value::LargeBinaryValue(s.to_owned())
529 })
530 }
531 ScalarValue::FixedSizeBinary(length, val) => {
532 create_proto_scalar(val.as_ref(), &data_type, |s| {
533 Value::FixedSizeBinaryValue(protobuf::ScalarFixedSizeBinary {
534 values: s.to_owned(),
535 length: *length,
536 })
537 })
538 }
539
540 ScalarValue::Time32Second(v) => {
541 create_proto_scalar(v.as_ref(), &data_type, |v| {
542 Value::Time32Value(protobuf::ScalarTime32Value {
543 value: Some(
544 protobuf::scalar_time32_value::Value::Time32SecondValue(*v),
545 ),
546 })
547 })
548 }
549
550 ScalarValue::Time32Millisecond(v) => {
551 create_proto_scalar(v.as_ref(), &data_type, |v| {
552 Value::Time32Value(protobuf::ScalarTime32Value {
553 value: Some(
554 protobuf::scalar_time32_value::Value::Time32MillisecondValue(
555 *v,
556 ),
557 ),
558 })
559 })
560 }
561
562 ScalarValue::Time64Microsecond(v) => {
563 create_proto_scalar(v.as_ref(), &data_type, |v| {
564 Value::Time64Value(protobuf::ScalarTime64Value {
565 value: Some(
566 protobuf::scalar_time64_value::Value::Time64MicrosecondValue(
567 *v,
568 ),
569 ),
570 })
571 })
572 }
573
574 ScalarValue::Time64Nanosecond(v) => {
575 create_proto_scalar(v.as_ref(), &data_type, |v| {
576 Value::Time64Value(protobuf::ScalarTime64Value {
577 value: Some(
578 protobuf::scalar_time64_value::Value::Time64NanosecondValue(
579 *v,
580 ),
581 ),
582 })
583 })
584 }
585
586 ScalarValue::IntervalDayTime(val) => {
587 let value = if let Some(v) = val {
588 let (days, milliseconds) = IntervalDayTimeType::to_parts(*v);
589 Value::IntervalDaytimeValue(protobuf::IntervalDayTimeValue {
590 days,
591 milliseconds,
592 })
593 } else {
594 Value::NullValue((&data_type).try_into()?)
595 };
596
597 Ok(protobuf::ScalarValue { value: Some(value) })
598 }
599
600 ScalarValue::IntervalMonthDayNano(v) => {
601 let value = if let Some(v) = v {
602 let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v);
603 Value::IntervalMonthDayNano(protobuf::IntervalMonthDayNanoValue {
604 months,
605 days,
606 nanos,
607 })
608 } else {
609 Value::NullValue((&data_type).try_into()?)
610 };
611
612 Ok(protobuf::ScalarValue { value: Some(value) })
613 }
614
615 ScalarValue::DurationSecond(v) => {
616 let value = match v {
617 Some(v) => Value::DurationSecondValue(*v),
618 None => Value::NullValue((&data_type).try_into()?),
619 };
620 Ok(protobuf::ScalarValue { value: Some(value) })
621 }
622 ScalarValue::DurationMillisecond(v) => {
623 let value = match v {
624 Some(v) => Value::DurationMillisecondValue(*v),
625 None => Value::NullValue((&data_type).try_into()?),
626 };
627 Ok(protobuf::ScalarValue { value: Some(value) })
628 }
629 ScalarValue::DurationMicrosecond(v) => {
630 let value = match v {
631 Some(v) => Value::DurationMicrosecondValue(*v),
632 None => Value::NullValue((&data_type).try_into()?),
633 };
634 Ok(protobuf::ScalarValue { value: Some(value) })
635 }
636 ScalarValue::DurationNanosecond(v) => {
637 let value = match v {
638 Some(v) => Value::DurationNanosecondValue(*v),
639 None => Value::NullValue((&data_type).try_into()?),
640 };
641 Ok(protobuf::ScalarValue { value: Some(value) })
642 }
643
644 ScalarValue::Union(val, df_fields, mode) => {
645 let mut fields =
646 Vec::<protobuf::UnionField>::with_capacity(df_fields.len());
647 for (id, field) in df_fields.iter() {
648 let field_id = id as i32;
649 let field = Some(field.as_ref().try_into()?);
650 let field = protobuf::UnionField { field_id, field };
651 fields.push(field);
652 }
653 let mode = match mode {
654 UnionMode::Sparse => 0,
655 UnionMode::Dense => 1,
656 };
657 let value = match val {
658 None => None,
659 Some((_id, v)) => Some(Box::new(v.as_ref().try_into()?)),
660 };
661 let val = protobuf::UnionValue {
662 value_id: val.as_ref().map(|(id, _v)| *id as i32).unwrap_or(0),
663 value,
664 fields,
665 mode,
666 };
667 let val = Value::UnionValue(Box::new(val));
668 let val = protobuf::ScalarValue { value: Some(val) };
669 Ok(val)
670 }
671
672 ScalarValue::Dictionary(index_type, val) => {
673 let value: protobuf::ScalarValue = val.as_ref().try_into()?;
674 Ok(protobuf::ScalarValue {
675 value: Some(Value::DictionaryValue(Box::new(
676 protobuf::ScalarDictionaryValue {
677 index_type: Some(index_type.as_ref().try_into()?),
678 value: Some(Box::new(value)),
679 },
680 ))),
681 })
682 }
683 }
684 }
685}
686
687impl From<&TimeUnit> for protobuf::TimeUnit {
688 fn from(val: &TimeUnit) -> Self {
689 match val {
690 TimeUnit::Second => protobuf::TimeUnit::Second,
691 TimeUnit::Millisecond => protobuf::TimeUnit::Millisecond,
692 TimeUnit::Microsecond => protobuf::TimeUnit::Microsecond,
693 TimeUnit::Nanosecond => protobuf::TimeUnit::Nanosecond,
694 }
695 }
696}
697
698impl From<&IntervalUnit> for protobuf::IntervalUnit {
699 fn from(interval_unit: &IntervalUnit) -> Self {
700 match interval_unit {
701 IntervalUnit::YearMonth => protobuf::IntervalUnit::YearMonth,
702 IntervalUnit::DayTime => protobuf::IntervalUnit::DayTime,
703 IntervalUnit::MonthDayNano => protobuf::IntervalUnit::MonthDayNano,
704 }
705 }
706}
707
708impl From<Constraints> for protobuf::Constraints {
709 fn from(value: Constraints) -> Self {
710 let constraints = value.into_iter().map(|item| item.into()).collect();
711 protobuf::Constraints { constraints }
712 }
713}
714
715impl From<Constraint> for protobuf::Constraint {
716 fn from(value: Constraint) -> Self {
717 let res = match value {
718 Constraint::PrimaryKey(indices) => {
719 let indices = indices.into_iter().map(|item| item as u64).collect();
720 protobuf::constraint::ConstraintMode::PrimaryKey(
721 protobuf::PrimaryKeyConstraint { indices },
722 )
723 }
724 Constraint::Unique(indices) => {
725 let indices = indices.into_iter().map(|item| item as u64).collect();
726 protobuf::constraint::ConstraintMode::PrimaryKey(
727 protobuf::PrimaryKeyConstraint { indices },
728 )
729 }
730 };
731 protobuf::Constraint {
732 constraint_mode: Some(res),
733 }
734 }
735}
736
737impl From<&Precision<usize>> for protobuf::Precision {
738 fn from(s: &Precision<usize>) -> protobuf::Precision {
739 match s {
740 Precision::Exact(val) => protobuf::Precision {
741 precision_info: protobuf::PrecisionInfo::Exact.into(),
742 val: Some(crate::protobuf_common::ScalarValue {
743 value: Some(Value::Uint64Value(*val as u64)),
744 }),
745 },
746 Precision::Inexact(val) => protobuf::Precision {
747 precision_info: protobuf::PrecisionInfo::Inexact.into(),
748 val: Some(crate::protobuf_common::ScalarValue {
749 value: Some(Value::Uint64Value(*val as u64)),
750 }),
751 },
752 Precision::Absent => protobuf::Precision {
753 precision_info: protobuf::PrecisionInfo::Absent.into(),
754 val: Some(crate::protobuf_common::ScalarValue { value: None }),
755 },
756 }
757 }
758}
759
760impl From<&Precision<datafusion_common::ScalarValue>> for protobuf::Precision {
761 fn from(s: &Precision<datafusion_common::ScalarValue>) -> protobuf::Precision {
762 match s {
763 Precision::Exact(val) => protobuf::Precision {
764 precision_info: protobuf::PrecisionInfo::Exact.into(),
765 val: val.try_into().ok(),
766 },
767 Precision::Inexact(val) => protobuf::Precision {
768 precision_info: protobuf::PrecisionInfo::Inexact.into(),
769 val: val.try_into().ok(),
770 },
771 Precision::Absent => protobuf::Precision {
772 precision_info: protobuf::PrecisionInfo::Absent.into(),
773 val: Some(crate::protobuf_common::ScalarValue { value: None }),
774 },
775 }
776 }
777}
778
779impl From<&Statistics> for protobuf::Statistics {
780 fn from(s: &Statistics) -> protobuf::Statistics {
781 let column_stats = s.column_statistics.iter().map(|s| s.into()).collect();
782 protobuf::Statistics {
783 num_rows: Some(protobuf::Precision::from(&s.num_rows)),
784 total_byte_size: Some(protobuf::Precision::from(&s.total_byte_size)),
785 column_stats,
786 }
787 }
788}
789
790impl From<&ColumnStatistics> for protobuf::ColumnStats {
791 fn from(s: &ColumnStatistics) -> protobuf::ColumnStats {
792 protobuf::ColumnStats {
793 min_value: Some(protobuf::Precision::from(&s.min_value)),
794 max_value: Some(protobuf::Precision::from(&s.max_value)),
795 sum_value: Some(protobuf::Precision::from(&s.sum_value)),
796 null_count: Some(protobuf::Precision::from(&s.null_count)),
797 distinct_count: Some(protobuf::Precision::from(&s.distinct_count)),
798 byte_size: Some(protobuf::Precision::from(&s.byte_size)),
799 }
800 }
801}
802
803impl From<JoinSide> for protobuf::JoinSide {
804 fn from(t: JoinSide) -> Self {
805 match t {
806 JoinSide::Left => protobuf::JoinSide::LeftSide,
807 JoinSide::Right => protobuf::JoinSide::RightSide,
808 JoinSide::None => protobuf::JoinSide::None,
809 }
810 }
811}
812
813impl From<&CompressionTypeVariant> for protobuf::CompressionTypeVariant {
814 fn from(value: &CompressionTypeVariant) -> Self {
815 match value {
816 CompressionTypeVariant::GZIP => Self::Gzip,
817 CompressionTypeVariant::BZIP2 => Self::Bzip2,
818 CompressionTypeVariant::XZ => Self::Xz,
819 CompressionTypeVariant::ZSTD => Self::Zstd,
820 CompressionTypeVariant::UNCOMPRESSED => Self::Uncompressed,
821 }
822 }
823}
824
825impl TryFrom<&CsvWriterOptions> for protobuf::CsvWriterOptions {
826 type Error = DataFusionError;
827
828 fn try_from(opts: &CsvWriterOptions) -> datafusion_common::Result<Self, Self::Error> {
829 Ok(csv_writer_options_to_proto(
830 &opts.writer_options,
831 &opts.compression,
832 ))
833 }
834}
835
836impl TryFrom<&JsonWriterOptions> for protobuf::JsonWriterOptions {
837 type Error = DataFusionError;
838
839 fn try_from(
840 opts: &JsonWriterOptions,
841 ) -> datafusion_common::Result<Self, Self::Error> {
842 let compression: protobuf::CompressionTypeVariant = opts.compression.into();
843 Ok(protobuf::JsonWriterOptions {
844 compression: compression.into(),
845 })
846 }
847}
848
849impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
850 type Error = DataFusionError;
851
852 fn try_from(value: &ParquetOptions) -> datafusion_common::Result<Self, Self::Error> {
853 Ok(protobuf::ParquetOptions {
854 enable_page_index: value.enable_page_index,
855 pruning: value.pruning,
856 skip_metadata: value.skip_metadata,
857 metadata_size_hint_opt: value.metadata_size_hint.map(|v| protobuf::parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v as u64)),
858 pushdown_filters: value.pushdown_filters,
859 reorder_filters: value.reorder_filters,
860 force_filter_selections: value.force_filter_selections,
861 data_pagesize_limit: value.data_pagesize_limit as u64,
862 write_batch_size: value.write_batch_size as u64,
863 writer_version: value.writer_version.to_string(),
864 compression_opt: value.compression.clone().map(protobuf::parquet_options::CompressionOpt::Compression),
865 dictionary_enabled_opt: value.dictionary_enabled.map(protobuf::parquet_options::DictionaryEnabledOpt::DictionaryEnabled),
866 dictionary_page_size_limit: value.dictionary_page_size_limit as u64,
867 statistics_enabled_opt: value.statistics_enabled.clone().map(protobuf::parquet_options::StatisticsEnabledOpt::StatisticsEnabled),
868 max_row_group_size: value.max_row_group_size as u64,
869 created_by: value.created_by.clone(),
870 column_index_truncate_length_opt: value.column_index_truncate_length.map(|v| protobuf::parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v as u64)),
871 statistics_truncate_length_opt: value.statistics_truncate_length.map(|v| protobuf::parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v as u64)),
872 data_page_row_count_limit: value.data_page_row_count_limit as u64,
873 encoding_opt: value.encoding.clone().map(protobuf::parquet_options::EncodingOpt::Encoding),
874 bloom_filter_on_read: value.bloom_filter_on_read,
875 bloom_filter_on_write: value.bloom_filter_on_write,
876 bloom_filter_fpp_opt: value.bloom_filter_fpp.map(protobuf::parquet_options::BloomFilterFppOpt::BloomFilterFpp),
877 bloom_filter_ndv_opt: value.bloom_filter_ndv.map(protobuf::parquet_options::BloomFilterNdvOpt::BloomFilterNdv),
878 allow_single_file_parallelism: value.allow_single_file_parallelism,
879 maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as u64,
880 maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as u64,
881 schema_force_view_types: value.schema_force_view_types,
882 binary_as_string: value.binary_as_string,
883 skip_arrow_metadata: value.skip_arrow_metadata,
884 coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96),
885 max_predicate_cache_size_opt: value.max_predicate_cache_size.map(|v| protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as u64)),
886 })
887 }
888}
889
890impl TryFrom<&ParquetColumnOptions> for protobuf::ParquetColumnOptions {
891 type Error = DataFusionError;
892
893 fn try_from(
894 value: &ParquetColumnOptions,
895 ) -> datafusion_common::Result<Self, Self::Error> {
896 Ok(protobuf::ParquetColumnOptions {
897 compression_opt: value
898 .compression
899 .clone()
900 .map(protobuf::parquet_column_options::CompressionOpt::Compression),
901 dictionary_enabled_opt: value
902 .dictionary_enabled
903 .map(protobuf::parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled),
904 statistics_enabled_opt: value
905 .statistics_enabled
906 .clone()
907 .map(protobuf::parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled),
908 encoding_opt: value
909 .encoding
910 .clone()
911 .map(protobuf::parquet_column_options::EncodingOpt::Encoding),
912 bloom_filter_enabled_opt: value
913 .bloom_filter_enabled
914 .map(protobuf::parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled),
915 bloom_filter_fpp_opt: value
916 .bloom_filter_fpp
917 .map(protobuf::parquet_column_options::BloomFilterFppOpt::BloomFilterFpp),
918 bloom_filter_ndv_opt: value
919 .bloom_filter_ndv
920 .map(protobuf::parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv),
921 })
922 }
923}
924
925impl TryFrom<&TableParquetOptions> for protobuf::TableParquetOptions {
926 type Error = DataFusionError;
927 fn try_from(
928 value: &TableParquetOptions,
929 ) -> datafusion_common::Result<Self, Self::Error> {
930 let column_specific_options = value
931 .column_specific_options
932 .iter()
933 .map(|(k, v)| {
934 Ok(protobuf::ParquetColumnSpecificOptions {
935 column_name: k.into(),
936 options: Some(v.try_into()?),
937 })
938 })
939 .collect::<datafusion_common::Result<Vec<_>>>()?;
940 let key_value_metadata = value
941 .key_value_metadata
942 .iter()
943 .filter_map(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone())))
944 .collect::<HashMap<String, String>>();
945 Ok(protobuf::TableParquetOptions {
946 global: Some((&value.global).try_into()?),
947 column_specific_options,
948 key_value_metadata,
949 })
950 }
951}
952
953impl TryFrom<&CsvOptions> for protobuf::CsvOptions {
954 type Error = DataFusionError; fn try_from(opts: &CsvOptions) -> datafusion_common::Result<Self, Self::Error> {
957 let compression: protobuf::CompressionTypeVariant = opts.compression.into();
958 Ok(protobuf::CsvOptions {
959 has_header: opts.has_header.map_or_else(Vec::new, |h| vec![h as u8]),
960 delimiter: vec![opts.delimiter],
961 quote: vec![opts.quote],
962 terminator: opts.terminator.map_or_else(Vec::new, |e| vec![e]),
963 escape: opts.escape.map_or_else(Vec::new, |e| vec![e]),
964 double_quote: opts.double_quote.map_or_else(Vec::new, |h| vec![h as u8]),
965 newlines_in_values: opts
966 .newlines_in_values
967 .map_or_else(Vec::new, |h| vec![h as u8]),
968 compression: compression.into(),
969 schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
970 date_format: opts.date_format.clone().unwrap_or_default(),
971 datetime_format: opts.datetime_format.clone().unwrap_or_default(),
972 timestamp_format: opts.timestamp_format.clone().unwrap_or_default(),
973 timestamp_tz_format: opts.timestamp_tz_format.clone().unwrap_or_default(),
974 time_format: opts.time_format.clone().unwrap_or_default(),
975 null_value: opts.null_value.clone().unwrap_or_default(),
976 null_regex: opts.null_regex.clone().unwrap_or_default(),
977 comment: opts.comment.map_or_else(Vec::new, |h| vec![h]),
978 truncated_rows: opts.truncated_rows.map_or_else(Vec::new, |h| vec![h as u8]),
979 compression_level: opts.compression_level,
980 })
981 }
982}
983
984impl TryFrom<&JsonOptions> for protobuf::JsonOptions {
985 type Error = DataFusionError;
986
987 fn try_from(opts: &JsonOptions) -> datafusion_common::Result<Self, Self::Error> {
988 let compression: protobuf::CompressionTypeVariant = opts.compression.into();
989 Ok(protobuf::JsonOptions {
990 compression: compression.into(),
991 schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
992 compression_level: opts.compression_level,
993 })
994 }
995}
996
997fn create_proto_scalar<I, T: FnOnce(&I) -> protobuf::scalar_value::Value>(
1000 v: Option<&I>,
1001 null_arrow_type: &DataType,
1002 constructor: T,
1003) -> Result<protobuf::ScalarValue, Error> {
1004 let value = v
1005 .map(constructor)
1006 .unwrap_or(protobuf::scalar_value::Value::NullValue(
1007 null_arrow_type.try_into()?,
1008 ));
1009
1010 Ok(protobuf::ScalarValue { value: Some(value) })
1011}
1012
1013fn encode_scalar_nested_value(
1016 arr: ArrayRef,
1017 val: &ScalarValue,
1018) -> Result<protobuf::ScalarValue, Error> {
1019 let batch = RecordBatch::try_from_iter(vec![("field_name", arr)]).map_err(|e| {
1020 Error::General(format!(
1021 "Error creating temporary batch while encoding ScalarValue::List: {e}"
1022 ))
1023 })?;
1024
1025 let ipc_gen = IpcDataGenerator {};
1026 let mut dict_tracker = DictionaryTracker::new(false);
1027 let write_options = IpcWriteOptions::default();
1028 let mut compression_context = CompressionContext::default();
1029 let (encoded_dictionaries, encoded_message) = ipc_gen
1030 .encode(
1031 &batch,
1032 &mut dict_tracker,
1033 &write_options,
1034 &mut compression_context,
1035 )
1036 .map_err(|e| {
1037 Error::General(format!("Error encoding ScalarValue::List as IPC: {e}"))
1038 })?;
1039
1040 let schema: protobuf::Schema = batch.schema().try_into()?;
1041
1042 let scalar_list_value = protobuf::ScalarNestedValue {
1043 ipc_message: encoded_message.ipc_message,
1044 arrow_data: encoded_message.arrow_data,
1045 dictionaries: encoded_dictionaries
1046 .into_iter()
1047 .map(|data| protobuf::scalar_nested_value::Dictionary {
1048 ipc_message: data.ipc_message,
1049 arrow_data: data.arrow_data,
1050 })
1051 .collect(),
1052 schema: Some(schema),
1053 };
1054
1055 match val {
1056 ScalarValue::List(_) => Ok(protobuf::ScalarValue {
1057 value: Some(protobuf::scalar_value::Value::ListValue(scalar_list_value)),
1058 }),
1059 ScalarValue::LargeList(_) => Ok(protobuf::ScalarValue {
1060 value: Some(protobuf::scalar_value::Value::LargeListValue(
1061 scalar_list_value,
1062 )),
1063 }),
1064 ScalarValue::FixedSizeList(_) => Ok(protobuf::ScalarValue {
1065 value: Some(protobuf::scalar_value::Value::FixedSizeListValue(
1066 scalar_list_value,
1067 )),
1068 }),
1069 ScalarValue::Struct(_) => Ok(protobuf::ScalarValue {
1070 value: Some(protobuf::scalar_value::Value::StructValue(
1071 scalar_list_value,
1072 )),
1073 }),
1074 ScalarValue::Map(_) => Ok(protobuf::ScalarValue {
1075 value: Some(protobuf::scalar_value::Value::MapValue(scalar_list_value)),
1076 }),
1077 _ => unreachable!(),
1078 }
1079}
1080
1081fn convert_arc_fields_to_proto_fields<'a, I>(
1083 fields: I,
1084) -> Result<Vec<protobuf::Field>, Error>
1085where
1086 I: IntoIterator<Item = &'a Arc<Field>>,
1087{
1088 fields
1089 .into_iter()
1090 .map(|field| field.as_ref().try_into())
1091 .collect::<Result<Vec<_>, Error>>()
1092}
1093
1094pub(crate) fn csv_writer_options_to_proto(
1095 csv_options: &WriterBuilder,
1096 compression: &CompressionTypeVariant,
1097) -> protobuf::CsvWriterOptions {
1098 let compression: protobuf::CompressionTypeVariant = compression.into();
1099 protobuf::CsvWriterOptions {
1100 compression: compression.into(),
1101 delimiter: (csv_options.delimiter() as char).to_string(),
1102 has_header: csv_options.header(),
1103 date_format: csv_options.date_format().unwrap_or("").to_owned(),
1104 datetime_format: csv_options.datetime_format().unwrap_or("").to_owned(),
1105 timestamp_format: csv_options.timestamp_format().unwrap_or("").to_owned(),
1106 time_format: csv_options.time_format().unwrap_or("").to_owned(),
1107 null_value: csv_options.null().to_owned(),
1108 quote: (csv_options.quote() as char).to_string(),
1109 escape: (csv_options.escape() as char).to_string(),
1110 double_quote: csv_options.double_quote(),
1111 }
1112}