1use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::protobuf_common as protobuf;
22use crate::protobuf_common::{
23 arrow_type::ArrowTypeEnum, scalar_value::Value, EmptyMessage,
24};
25use arrow::array::{ArrayRef, RecordBatch};
26use arrow::csv::WriterBuilder;
27use arrow::datatypes::{
28 DataType, Field, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, Schema,
29 SchemaRef, TimeUnit, UnionMode,
30};
31use arrow::ipc::writer::{
32 CompressionContext, DictionaryTracker, IpcDataGenerator, IpcWriteOptions,
33};
34use datafusion_common::{
35 config::{
36 CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
37 TableParquetOptions,
38 },
39 file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
40 parsers::CompressionTypeVariant,
41 plan_datafusion_err,
42 stats::Precision,
43 Column, ColumnStatistics, Constraint, Constraints, DFSchema, DFSchemaRef,
44 DataFusionError, JoinSide, ScalarValue, Statistics,
45};
46
47#[derive(Debug)]
48pub enum Error {
49 General(String),
50
51 InvalidScalarValue(ScalarValue),
52
53 InvalidScalarType(DataType),
54
55 InvalidTimeUnit(TimeUnit),
56
57 NotImplemented(String),
58}
59
60impl std::error::Error for Error {}
61
62impl std::fmt::Display for Error {
63 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
64 match self {
65 Self::General(desc) => write!(f, "General error: {desc}"),
66 Self::InvalidScalarValue(value) => {
67 write!(f, "{value:?} is invalid as a DataFusion scalar value")
68 }
69 Self::InvalidScalarType(data_type) => {
70 write!(f, "{data_type} is invalid as a DataFusion scalar type")
71 }
72 Self::InvalidTimeUnit(time_unit) => {
73 write!(
74 f,
75 "Only TimeUnit::Microsecond and TimeUnit::Nanosecond are valid time units, found: {time_unit:?}"
76 )
77 }
78 Self::NotImplemented(s) => {
79 write!(f, "Not implemented: {s}")
80 }
81 }
82 }
83}
84
85impl From<Error> for DataFusionError {
86 fn from(e: Error) -> Self {
87 plan_datafusion_err!("{}", e)
88 }
89}
90
91impl TryFrom<&Field> for protobuf::Field {
92 type Error = Error;
93
94 fn try_from(field: &Field) -> Result<Self, Self::Error> {
95 let arrow_type = field.data_type().try_into()?;
96 Ok(Self {
97 name: field.name().to_owned(),
98 arrow_type: Some(Box::new(arrow_type)),
99 nullable: field.is_nullable(),
100 children: Vec::new(),
101 metadata: field.metadata().clone(),
102 })
103 }
104}
105
106impl TryFrom<&DataType> for protobuf::ArrowType {
107 type Error = Error;
108
109 fn try_from(val: &DataType) -> Result<Self, Self::Error> {
110 let arrow_type_enum: ArrowTypeEnum = val.try_into()?;
111 Ok(Self {
112 arrow_type_enum: Some(arrow_type_enum),
113 })
114 }
115}
116
117impl TryFrom<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
118 type Error = Error;
119
120 fn try_from(val: &DataType) -> Result<Self, Self::Error> {
121 let res = match val {
122 DataType::Null => Self::None(EmptyMessage {}),
123 DataType::Boolean => Self::Bool(EmptyMessage {}),
124 DataType::Int8 => Self::Int8(EmptyMessage {}),
125 DataType::Int16 => Self::Int16(EmptyMessage {}),
126 DataType::Int32 => Self::Int32(EmptyMessage {}),
127 DataType::Int64 => Self::Int64(EmptyMessage {}),
128 DataType::UInt8 => Self::Uint8(EmptyMessage {}),
129 DataType::UInt16 => Self::Uint16(EmptyMessage {}),
130 DataType::UInt32 => Self::Uint32(EmptyMessage {}),
131 DataType::UInt64 => Self::Uint64(EmptyMessage {}),
132 DataType::Float16 => Self::Float16(EmptyMessage {}),
133 DataType::Float32 => Self::Float32(EmptyMessage {}),
134 DataType::Float64 => Self::Float64(EmptyMessage {}),
135 DataType::Timestamp(time_unit, timezone) => {
136 Self::Timestamp(protobuf::Timestamp {
137 time_unit: protobuf::TimeUnit::from(time_unit) as i32,
138 timezone: timezone.as_deref().unwrap_or("").to_string(),
139 })
140 }
141 DataType::Date32 => Self::Date32(EmptyMessage {}),
142 DataType::Date64 => Self::Date64(EmptyMessage {}),
143 DataType::Time32(time_unit) => {
144 Self::Time32(protobuf::TimeUnit::from(time_unit) as i32)
145 }
146 DataType::Time64(time_unit) => {
147 Self::Time64(protobuf::TimeUnit::from(time_unit) as i32)
148 }
149 DataType::Duration(time_unit) => {
150 Self::Duration(protobuf::TimeUnit::from(time_unit) as i32)
151 }
152 DataType::Interval(interval_unit) => {
153 Self::Interval(protobuf::IntervalUnit::from(interval_unit) as i32)
154 }
155 DataType::Binary => Self::Binary(EmptyMessage {}),
156 DataType::BinaryView => Self::BinaryView(EmptyMessage {}),
157 DataType::FixedSizeBinary(size) => Self::FixedSizeBinary(*size),
158 DataType::LargeBinary => Self::LargeBinary(EmptyMessage {}),
159 DataType::Utf8 => Self::Utf8(EmptyMessage {}),
160 DataType::Utf8View => Self::Utf8View(EmptyMessage {}),
161 DataType::LargeUtf8 => Self::LargeUtf8(EmptyMessage {}),
162 DataType::List(item_type) => Self::List(Box::new(protobuf::List {
163 field_type: Some(Box::new(item_type.as_ref().try_into()?)),
164 })),
165 DataType::FixedSizeList(item_type, size) => {
166 Self::FixedSizeList(Box::new(protobuf::FixedSizeList {
167 field_type: Some(Box::new(item_type.as_ref().try_into()?)),
168 list_size: *size,
169 }))
170 }
171 DataType::LargeList(item_type) => Self::LargeList(Box::new(protobuf::List {
172 field_type: Some(Box::new(item_type.as_ref().try_into()?)),
173 })),
174 DataType::Struct(struct_fields) => Self::Struct(protobuf::Struct {
175 sub_field_types: convert_arc_fields_to_proto_fields(struct_fields)?,
176 }),
177 DataType::Union(fields, union_mode) => {
178 let union_mode = match union_mode {
179 UnionMode::Sparse => protobuf::UnionMode::Sparse,
180 UnionMode::Dense => protobuf::UnionMode::Dense,
181 };
182 Self::Union(protobuf::Union {
183 union_types: convert_arc_fields_to_proto_fields(fields.iter().map(|(_, item)|item))?,
184 union_mode: union_mode.into(),
185 type_ids: fields.iter().map(|(x, _)| x as i32).collect(),
186 })
187 }
188 DataType::Dictionary(key_type, value_type) => {
189 Self::Dictionary(Box::new(protobuf::Dictionary {
190 key: Some(Box::new(key_type.as_ref().try_into()?)),
191 value: Some(Box::new(value_type.as_ref().try_into()?)),
192 }))
193 }
194 DataType::Decimal32(precision, scale) => Self::Decimal32(protobuf::Decimal32Type {
195 precision: *precision as u32,
196 scale: *scale as i32,
197 }),
198 DataType::Decimal64(precision, scale) => Self::Decimal64(protobuf::Decimal64Type {
199 precision: *precision as u32,
200 scale: *scale as i32,
201 }),
202 DataType::Decimal128(precision, scale) => Self::Decimal128(protobuf::Decimal128Type {
203 precision: *precision as u32,
204 scale: *scale as i32,
205 }),
206 DataType::Decimal256(precision, scale) => Self::Decimal256(protobuf::Decimal256Type {
207 precision: *precision as u32,
208 scale: *scale as i32,
209 }),
210 DataType::Map(field, sorted) => {
211 Self::Map(Box::new(
212 protobuf::Map {
213 field_type: Some(Box::new(field.as_ref().try_into()?)),
214 keys_sorted: *sorted,
215 }
216 ))
217 }
218 DataType::RunEndEncoded(_, _) => {
219 return Err(Error::General(
220 "Proto serialization error: The RunEndEncoded data type is not yet supported".to_owned()
221 ))
222 }
223 DataType::ListView(_) | DataType::LargeListView(_) => {
224 return Err(Error::General(format!("Proto serialization error: {val} not yet supported")))
225 }
226 };
227
228 Ok(res)
229 }
230}
231
232impl From<Column> for protobuf::Column {
233 fn from(c: Column) -> Self {
234 Self {
235 relation: c.relation.map(|relation| protobuf::ColumnRelation {
236 relation: relation.to_string(),
237 }),
238 name: c.name,
239 }
240 }
241}
242
243impl From<&Column> for protobuf::Column {
244 fn from(c: &Column) -> Self {
245 c.clone().into()
246 }
247}
248
249impl TryFrom<&Schema> for protobuf::Schema {
250 type Error = Error;
251
252 fn try_from(schema: &Schema) -> Result<Self, Self::Error> {
253 Ok(Self {
254 columns: convert_arc_fields_to_proto_fields(schema.fields())?,
255 metadata: schema.metadata.clone(),
256 })
257 }
258}
259
260impl TryFrom<SchemaRef> for protobuf::Schema {
261 type Error = Error;
262
263 fn try_from(schema: SchemaRef) -> Result<Self, Self::Error> {
264 Ok(Self {
265 columns: convert_arc_fields_to_proto_fields(schema.fields())?,
266 metadata: schema.metadata.clone(),
267 })
268 }
269}
270
271impl TryFrom<&DFSchema> for protobuf::DfSchema {
272 type Error = Error;
273
274 fn try_from(s: &DFSchema) -> Result<Self, Self::Error> {
275 let columns = s
276 .iter()
277 .map(|(qualifier, field)| {
278 Ok(protobuf::DfField {
279 field: Some(field.as_ref().try_into()?),
280 qualifier: qualifier.map(|r| protobuf::ColumnRelation {
281 relation: r.to_string(),
282 }),
283 })
284 })
285 .collect::<Result<Vec<_>, Error>>()?;
286 Ok(Self {
287 columns,
288 metadata: s.metadata().clone(),
289 })
290 }
291}
292
293impl TryFrom<&DFSchemaRef> for protobuf::DfSchema {
294 type Error = Error;
295
296 fn try_from(s: &DFSchemaRef) -> Result<Self, Self::Error> {
297 s.as_ref().try_into()
298 }
299}
300
301impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
302 type Error = Error;
303
304 fn try_from(val: &ScalarValue) -> Result<Self, Self::Error> {
305 let data_type = val.data_type();
306 match val {
307 ScalarValue::Boolean(val) => {
308 create_proto_scalar(val.as_ref(), &data_type, |s| Value::BoolValue(*s))
309 }
310 ScalarValue::Float16(val) => {
311 create_proto_scalar(val.as_ref(), &data_type, |s| {
312 Value::Float32Value((*s).into())
313 })
314 }
315 ScalarValue::Float32(val) => {
316 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Float32Value(*s))
317 }
318 ScalarValue::Float64(val) => {
319 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Float64Value(*s))
320 }
321 ScalarValue::Int8(val) => {
322 create_proto_scalar(val.as_ref(), &data_type, |s| {
323 Value::Int8Value(*s as i32)
324 })
325 }
326 ScalarValue::Int16(val) => {
327 create_proto_scalar(val.as_ref(), &data_type, |s| {
328 Value::Int16Value(*s as i32)
329 })
330 }
331 ScalarValue::Int32(val) => {
332 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Int32Value(*s))
333 }
334 ScalarValue::Int64(val) => {
335 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Int64Value(*s))
336 }
337 ScalarValue::UInt8(val) => {
338 create_proto_scalar(val.as_ref(), &data_type, |s| {
339 Value::Uint8Value(*s as u32)
340 })
341 }
342 ScalarValue::UInt16(val) => {
343 create_proto_scalar(val.as_ref(), &data_type, |s| {
344 Value::Uint16Value(*s as u32)
345 })
346 }
347 ScalarValue::UInt32(val) => {
348 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Uint32Value(*s))
349 }
350 ScalarValue::UInt64(val) => {
351 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Uint64Value(*s))
352 }
353 ScalarValue::Utf8(val) => {
354 create_proto_scalar(val.as_ref(), &data_type, |s| {
355 Value::Utf8Value(s.to_owned())
356 })
357 }
358 ScalarValue::LargeUtf8(val) => {
359 create_proto_scalar(val.as_ref(), &data_type, |s| {
360 Value::LargeUtf8Value(s.to_owned())
361 })
362 }
363 ScalarValue::Utf8View(val) => {
364 create_proto_scalar(val.as_ref(), &data_type, |s| {
365 Value::Utf8ViewValue(s.to_owned())
366 })
367 }
368 ScalarValue::List(arr) => {
369 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
370 }
371 ScalarValue::LargeList(arr) => {
372 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
373 }
374 ScalarValue::FixedSizeList(arr) => {
375 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
376 }
377 ScalarValue::Struct(arr) => {
378 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
379 }
380 ScalarValue::Map(arr) => {
381 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
382 }
383 ScalarValue::Date32(val) => {
384 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Date32Value(*s))
385 }
386 ScalarValue::TimestampMicrosecond(val, tz) => {
387 create_proto_scalar(val.as_ref(), &data_type, |s| {
388 Value::TimestampValue(protobuf::ScalarTimestampValue {
389 timezone: tz.as_deref().unwrap_or("").to_string(),
390 value: Some(
391 protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(
392 *s,
393 ),
394 ),
395 })
396 })
397 }
398 ScalarValue::TimestampNanosecond(val, tz) => {
399 create_proto_scalar(val.as_ref(), &data_type, |s| {
400 Value::TimestampValue(protobuf::ScalarTimestampValue {
401 timezone: tz.as_deref().unwrap_or("").to_string(),
402 value: Some(
403 protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(
404 *s,
405 ),
406 ),
407 })
408 })
409 }
410 ScalarValue::Decimal32(val, p, s) => match *val {
411 Some(v) => {
412 let array = v.to_be_bytes();
413 let vec_val: Vec<u8> = array.to_vec();
414 Ok(protobuf::ScalarValue {
415 value: Some(Value::Decimal32Value(protobuf::Decimal32 {
416 value: vec_val,
417 p: *p as i64,
418 s: *s as i64,
419 })),
420 })
421 }
422 None => Ok(protobuf::ScalarValue {
423 value: Some(protobuf::scalar_value::Value::NullValue(
424 (&data_type).try_into()?,
425 )),
426 }),
427 },
428 ScalarValue::Decimal64(val, p, s) => match *val {
429 Some(v) => {
430 let array = v.to_be_bytes();
431 let vec_val: Vec<u8> = array.to_vec();
432 Ok(protobuf::ScalarValue {
433 value: Some(Value::Decimal64Value(protobuf::Decimal64 {
434 value: vec_val,
435 p: *p as i64,
436 s: *s as i64,
437 })),
438 })
439 }
440 None => Ok(protobuf::ScalarValue {
441 value: Some(protobuf::scalar_value::Value::NullValue(
442 (&data_type).try_into()?,
443 )),
444 }),
445 },
446 ScalarValue::Decimal128(val, p, s) => match *val {
447 Some(v) => {
448 let array = v.to_be_bytes();
449 let vec_val: Vec<u8> = array.to_vec();
450 Ok(protobuf::ScalarValue {
451 value: Some(Value::Decimal128Value(protobuf::Decimal128 {
452 value: vec_val,
453 p: *p as i64,
454 s: *s as i64,
455 })),
456 })
457 }
458 None => Ok(protobuf::ScalarValue {
459 value: Some(protobuf::scalar_value::Value::NullValue(
460 (&data_type).try_into()?,
461 )),
462 }),
463 },
464 ScalarValue::Decimal256(val, p, s) => match *val {
465 Some(v) => {
466 let array = v.to_be_bytes();
467 let vec_val: Vec<u8> = array.to_vec();
468 Ok(protobuf::ScalarValue {
469 value: Some(Value::Decimal256Value(protobuf::Decimal256 {
470 value: vec_val,
471 p: *p as i64,
472 s: *s as i64,
473 })),
474 })
475 }
476 None => Ok(protobuf::ScalarValue {
477 value: Some(protobuf::scalar_value::Value::NullValue(
478 (&data_type).try_into()?,
479 )),
480 }),
481 },
482 ScalarValue::Date64(val) => {
483 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Date64Value(*s))
484 }
485 ScalarValue::TimestampSecond(val, tz) => {
486 create_proto_scalar(val.as_ref(), &data_type, |s| {
487 Value::TimestampValue(protobuf::ScalarTimestampValue {
488 timezone: tz.as_deref().unwrap_or("").to_string(),
489 value: Some(
490 protobuf::scalar_timestamp_value::Value::TimeSecondValue(*s),
491 ),
492 })
493 })
494 }
495 ScalarValue::TimestampMillisecond(val, tz) => {
496 create_proto_scalar(val.as_ref(), &data_type, |s| {
497 Value::TimestampValue(protobuf::ScalarTimestampValue {
498 timezone: tz.as_deref().unwrap_or("").to_string(),
499 value: Some(
500 protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(
501 *s,
502 ),
503 ),
504 })
505 })
506 }
507 ScalarValue::IntervalYearMonth(val) => {
508 create_proto_scalar(val.as_ref(), &data_type, |s| {
509 Value::IntervalYearmonthValue(*s)
510 })
511 }
512 ScalarValue::Null => Ok(protobuf::ScalarValue {
513 value: Some(Value::NullValue((&data_type).try_into()?)),
514 }),
515
516 ScalarValue::Binary(val) => {
517 create_proto_scalar(val.as_ref(), &data_type, |s| {
518 Value::BinaryValue(s.to_owned())
519 })
520 }
521 ScalarValue::BinaryView(val) => {
522 create_proto_scalar(val.as_ref(), &data_type, |s| {
523 Value::BinaryViewValue(s.to_owned())
524 })
525 }
526 ScalarValue::LargeBinary(val) => {
527 create_proto_scalar(val.as_ref(), &data_type, |s| {
528 Value::LargeBinaryValue(s.to_owned())
529 })
530 }
531 ScalarValue::FixedSizeBinary(length, val) => {
532 create_proto_scalar(val.as_ref(), &data_type, |s| {
533 Value::FixedSizeBinaryValue(protobuf::ScalarFixedSizeBinary {
534 values: s.to_owned(),
535 length: *length,
536 })
537 })
538 }
539
540 ScalarValue::Time32Second(v) => {
541 create_proto_scalar(v.as_ref(), &data_type, |v| {
542 Value::Time32Value(protobuf::ScalarTime32Value {
543 value: Some(
544 protobuf::scalar_time32_value::Value::Time32SecondValue(*v),
545 ),
546 })
547 })
548 }
549
550 ScalarValue::Time32Millisecond(v) => {
551 create_proto_scalar(v.as_ref(), &data_type, |v| {
552 Value::Time32Value(protobuf::ScalarTime32Value {
553 value: Some(
554 protobuf::scalar_time32_value::Value::Time32MillisecondValue(
555 *v,
556 ),
557 ),
558 })
559 })
560 }
561
562 ScalarValue::Time64Microsecond(v) => {
563 create_proto_scalar(v.as_ref(), &data_type, |v| {
564 Value::Time64Value(protobuf::ScalarTime64Value {
565 value: Some(
566 protobuf::scalar_time64_value::Value::Time64MicrosecondValue(
567 *v,
568 ),
569 ),
570 })
571 })
572 }
573
574 ScalarValue::Time64Nanosecond(v) => {
575 create_proto_scalar(v.as_ref(), &data_type, |v| {
576 Value::Time64Value(protobuf::ScalarTime64Value {
577 value: Some(
578 protobuf::scalar_time64_value::Value::Time64NanosecondValue(
579 *v,
580 ),
581 ),
582 })
583 })
584 }
585
586 ScalarValue::IntervalDayTime(val) => {
587 let value = if let Some(v) = val {
588 let (days, milliseconds) = IntervalDayTimeType::to_parts(*v);
589 Value::IntervalDaytimeValue(protobuf::IntervalDayTimeValue {
590 days,
591 milliseconds,
592 })
593 } else {
594 Value::NullValue((&data_type).try_into()?)
595 };
596
597 Ok(protobuf::ScalarValue { value: Some(value) })
598 }
599
600 ScalarValue::IntervalMonthDayNano(v) => {
601 let value = if let Some(v) = v {
602 let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v);
603 Value::IntervalMonthDayNano(protobuf::IntervalMonthDayNanoValue {
604 months,
605 days,
606 nanos,
607 })
608 } else {
609 Value::NullValue((&data_type).try_into()?)
610 };
611
612 Ok(protobuf::ScalarValue { value: Some(value) })
613 }
614
615 ScalarValue::DurationSecond(v) => {
616 let value = match v {
617 Some(v) => Value::DurationSecondValue(*v),
618 None => Value::NullValue((&data_type).try_into()?),
619 };
620 Ok(protobuf::ScalarValue { value: Some(value) })
621 }
622 ScalarValue::DurationMillisecond(v) => {
623 let value = match v {
624 Some(v) => Value::DurationMillisecondValue(*v),
625 None => Value::NullValue((&data_type).try_into()?),
626 };
627 Ok(protobuf::ScalarValue { value: Some(value) })
628 }
629 ScalarValue::DurationMicrosecond(v) => {
630 let value = match v {
631 Some(v) => Value::DurationMicrosecondValue(*v),
632 None => Value::NullValue((&data_type).try_into()?),
633 };
634 Ok(protobuf::ScalarValue { value: Some(value) })
635 }
636 ScalarValue::DurationNanosecond(v) => {
637 let value = match v {
638 Some(v) => Value::DurationNanosecondValue(*v),
639 None => Value::NullValue((&data_type).try_into()?),
640 };
641 Ok(protobuf::ScalarValue { value: Some(value) })
642 }
643
644 ScalarValue::Union(val, df_fields, mode) => {
645 let mut fields =
646 Vec::<protobuf::UnionField>::with_capacity(df_fields.len());
647 for (id, field) in df_fields.iter() {
648 let field_id = id as i32;
649 let field = Some(field.as_ref().try_into()?);
650 let field = protobuf::UnionField { field_id, field };
651 fields.push(field);
652 }
653 let mode = match mode {
654 UnionMode::Sparse => 0,
655 UnionMode::Dense => 1,
656 };
657 let value = match val {
658 None => None,
659 Some((_id, v)) => Some(Box::new(v.as_ref().try_into()?)),
660 };
661 let val = protobuf::UnionValue {
662 value_id: val.as_ref().map(|(id, _v)| *id as i32).unwrap_or(0),
663 value,
664 fields,
665 mode,
666 };
667 let val = Value::UnionValue(Box::new(val));
668 let val = protobuf::ScalarValue { value: Some(val) };
669 Ok(val)
670 }
671
672 ScalarValue::Dictionary(index_type, val) => {
673 let value: protobuf::ScalarValue = val.as_ref().try_into()?;
674 Ok(protobuf::ScalarValue {
675 value: Some(Value::DictionaryValue(Box::new(
676 protobuf::ScalarDictionaryValue {
677 index_type: Some(index_type.as_ref().try_into()?),
678 value: Some(Box::new(value)),
679 },
680 ))),
681 })
682 }
683 }
684 }
685}
686
687impl From<&TimeUnit> for protobuf::TimeUnit {
688 fn from(val: &TimeUnit) -> Self {
689 match val {
690 TimeUnit::Second => protobuf::TimeUnit::Second,
691 TimeUnit::Millisecond => protobuf::TimeUnit::Millisecond,
692 TimeUnit::Microsecond => protobuf::TimeUnit::Microsecond,
693 TimeUnit::Nanosecond => protobuf::TimeUnit::Nanosecond,
694 }
695 }
696}
697
698impl From<&IntervalUnit> for protobuf::IntervalUnit {
699 fn from(interval_unit: &IntervalUnit) -> Self {
700 match interval_unit {
701 IntervalUnit::YearMonth => protobuf::IntervalUnit::YearMonth,
702 IntervalUnit::DayTime => protobuf::IntervalUnit::DayTime,
703 IntervalUnit::MonthDayNano => protobuf::IntervalUnit::MonthDayNano,
704 }
705 }
706}
707
708impl From<Constraints> for protobuf::Constraints {
709 fn from(value: Constraints) -> Self {
710 let constraints = value.into_iter().map(|item| item.into()).collect();
711 protobuf::Constraints { constraints }
712 }
713}
714
715impl From<Constraint> for protobuf::Constraint {
716 fn from(value: Constraint) -> Self {
717 let res = match value {
718 Constraint::PrimaryKey(indices) => {
719 let indices = indices.into_iter().map(|item| item as u64).collect();
720 protobuf::constraint::ConstraintMode::PrimaryKey(
721 protobuf::PrimaryKeyConstraint { indices },
722 )
723 }
724 Constraint::Unique(indices) => {
725 let indices = indices.into_iter().map(|item| item as u64).collect();
726 protobuf::constraint::ConstraintMode::PrimaryKey(
727 protobuf::PrimaryKeyConstraint { indices },
728 )
729 }
730 };
731 protobuf::Constraint {
732 constraint_mode: Some(res),
733 }
734 }
735}
736
737impl From<&Precision<usize>> for protobuf::Precision {
738 fn from(s: &Precision<usize>) -> protobuf::Precision {
739 match s {
740 Precision::Exact(val) => protobuf::Precision {
741 precision_info: protobuf::PrecisionInfo::Exact.into(),
742 val: Some(crate::protobuf_common::ScalarValue {
743 value: Some(Value::Uint64Value(*val as u64)),
744 }),
745 },
746 Precision::Inexact(val) => protobuf::Precision {
747 precision_info: protobuf::PrecisionInfo::Inexact.into(),
748 val: Some(crate::protobuf_common::ScalarValue {
749 value: Some(Value::Uint64Value(*val as u64)),
750 }),
751 },
752 Precision::Absent => protobuf::Precision {
753 precision_info: protobuf::PrecisionInfo::Absent.into(),
754 val: Some(crate::protobuf_common::ScalarValue { value: None }),
755 },
756 }
757 }
758}
759
760impl From<&Precision<datafusion_common::ScalarValue>> for protobuf::Precision {
761 fn from(s: &Precision<datafusion_common::ScalarValue>) -> protobuf::Precision {
762 match s {
763 Precision::Exact(val) => protobuf::Precision {
764 precision_info: protobuf::PrecisionInfo::Exact.into(),
765 val: val.try_into().ok(),
766 },
767 Precision::Inexact(val) => protobuf::Precision {
768 precision_info: protobuf::PrecisionInfo::Inexact.into(),
769 val: val.try_into().ok(),
770 },
771 Precision::Absent => protobuf::Precision {
772 precision_info: protobuf::PrecisionInfo::Absent.into(),
773 val: Some(crate::protobuf_common::ScalarValue { value: None }),
774 },
775 }
776 }
777}
778
779impl From<&Statistics> for protobuf::Statistics {
780 fn from(s: &Statistics) -> protobuf::Statistics {
781 let column_stats = s.column_statistics.iter().map(|s| s.into()).collect();
782 protobuf::Statistics {
783 num_rows: Some(protobuf::Precision::from(&s.num_rows)),
784 total_byte_size: Some(protobuf::Precision::from(&s.total_byte_size)),
785 column_stats,
786 }
787 }
788}
789
790impl From<&ColumnStatistics> for protobuf::ColumnStats {
791 fn from(s: &ColumnStatistics) -> protobuf::ColumnStats {
792 protobuf::ColumnStats {
793 min_value: Some(protobuf::Precision::from(&s.min_value)),
794 max_value: Some(protobuf::Precision::from(&s.max_value)),
795 sum_value: Some(protobuf::Precision::from(&s.sum_value)),
796 null_count: Some(protobuf::Precision::from(&s.null_count)),
797 distinct_count: Some(protobuf::Precision::from(&s.distinct_count)),
798 }
799 }
800}
801
802impl From<JoinSide> for protobuf::JoinSide {
803 fn from(t: JoinSide) -> Self {
804 match t {
805 JoinSide::Left => protobuf::JoinSide::LeftSide,
806 JoinSide::Right => protobuf::JoinSide::RightSide,
807 JoinSide::None => protobuf::JoinSide::None,
808 }
809 }
810}
811
812impl From<&CompressionTypeVariant> for protobuf::CompressionTypeVariant {
813 fn from(value: &CompressionTypeVariant) -> Self {
814 match value {
815 CompressionTypeVariant::GZIP => Self::Gzip,
816 CompressionTypeVariant::BZIP2 => Self::Bzip2,
817 CompressionTypeVariant::XZ => Self::Xz,
818 CompressionTypeVariant::ZSTD => Self::Zstd,
819 CompressionTypeVariant::UNCOMPRESSED => Self::Uncompressed,
820 }
821 }
822}
823
824impl TryFrom<&CsvWriterOptions> for protobuf::CsvWriterOptions {
825 type Error = DataFusionError;
826
827 fn try_from(opts: &CsvWriterOptions) -> datafusion_common::Result<Self, Self::Error> {
828 Ok(csv_writer_options_to_proto(
829 &opts.writer_options,
830 &opts.compression,
831 ))
832 }
833}
834
835impl TryFrom<&JsonWriterOptions> for protobuf::JsonWriterOptions {
836 type Error = DataFusionError;
837
838 fn try_from(
839 opts: &JsonWriterOptions,
840 ) -> datafusion_common::Result<Self, Self::Error> {
841 let compression: protobuf::CompressionTypeVariant = opts.compression.into();
842 Ok(protobuf::JsonWriterOptions {
843 compression: compression.into(),
844 })
845 }
846}
847
848impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
849 type Error = DataFusionError;
850
851 fn try_from(value: &ParquetOptions) -> datafusion_common::Result<Self, Self::Error> {
852 Ok(protobuf::ParquetOptions {
853 enable_page_index: value.enable_page_index,
854 pruning: value.pruning,
855 skip_metadata: value.skip_metadata,
856 metadata_size_hint_opt: value.metadata_size_hint.map(|v| protobuf::parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v as u64)),
857 pushdown_filters: value.pushdown_filters,
858 reorder_filters: value.reorder_filters,
859 data_pagesize_limit: value.data_pagesize_limit as u64,
860 write_batch_size: value.write_batch_size as u64,
861 writer_version: value.writer_version.clone(),
862 compression_opt: value.compression.clone().map(protobuf::parquet_options::CompressionOpt::Compression),
863 dictionary_enabled_opt: value.dictionary_enabled.map(protobuf::parquet_options::DictionaryEnabledOpt::DictionaryEnabled),
864 dictionary_page_size_limit: value.dictionary_page_size_limit as u64,
865 statistics_enabled_opt: value.statistics_enabled.clone().map(protobuf::parquet_options::StatisticsEnabledOpt::StatisticsEnabled),
866 max_row_group_size: value.max_row_group_size as u64,
867 created_by: value.created_by.clone(),
868 column_index_truncate_length_opt: value.column_index_truncate_length.map(|v| protobuf::parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v as u64)),
869 statistics_truncate_length_opt: value.statistics_truncate_length.map(|v| protobuf::parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v as u64)),
870 data_page_row_count_limit: value.data_page_row_count_limit as u64,
871 encoding_opt: value.encoding.clone().map(protobuf::parquet_options::EncodingOpt::Encoding),
872 bloom_filter_on_read: value.bloom_filter_on_read,
873 bloom_filter_on_write: value.bloom_filter_on_write,
874 bloom_filter_fpp_opt: value.bloom_filter_fpp.map(protobuf::parquet_options::BloomFilterFppOpt::BloomFilterFpp),
875 bloom_filter_ndv_opt: value.bloom_filter_ndv.map(protobuf::parquet_options::BloomFilterNdvOpt::BloomFilterNdv),
876 allow_single_file_parallelism: value.allow_single_file_parallelism,
877 maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as u64,
878 maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as u64,
879 schema_force_view_types: value.schema_force_view_types,
880 binary_as_string: value.binary_as_string,
881 skip_arrow_metadata: value.skip_arrow_metadata,
882 coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96),
883 max_predicate_cache_size_opt: value.max_predicate_cache_size.map(|v| protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as u64)),
884 })
885 }
886}
887
888impl TryFrom<&ParquetColumnOptions> for protobuf::ParquetColumnOptions {
889 type Error = DataFusionError;
890
891 fn try_from(
892 value: &ParquetColumnOptions,
893 ) -> datafusion_common::Result<Self, Self::Error> {
894 Ok(protobuf::ParquetColumnOptions {
895 compression_opt: value
896 .compression
897 .clone()
898 .map(protobuf::parquet_column_options::CompressionOpt::Compression),
899 dictionary_enabled_opt: value
900 .dictionary_enabled
901 .map(protobuf::parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled),
902 statistics_enabled_opt: value
903 .statistics_enabled
904 .clone()
905 .map(protobuf::parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled),
906 encoding_opt: value
907 .encoding
908 .clone()
909 .map(protobuf::parquet_column_options::EncodingOpt::Encoding),
910 bloom_filter_enabled_opt: value
911 .bloom_filter_enabled
912 .map(protobuf::parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled),
913 bloom_filter_fpp_opt: value
914 .bloom_filter_fpp
915 .map(protobuf::parquet_column_options::BloomFilterFppOpt::BloomFilterFpp),
916 bloom_filter_ndv_opt: value
917 .bloom_filter_ndv
918 .map(protobuf::parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv),
919 })
920 }
921}
922
923impl TryFrom<&TableParquetOptions> for protobuf::TableParquetOptions {
924 type Error = DataFusionError;
925 fn try_from(
926 value: &TableParquetOptions,
927 ) -> datafusion_common::Result<Self, Self::Error> {
928 let column_specific_options = value
929 .column_specific_options
930 .iter()
931 .map(|(k, v)| {
932 Ok(protobuf::ParquetColumnSpecificOptions {
933 column_name: k.into(),
934 options: Some(v.try_into()?),
935 })
936 })
937 .collect::<datafusion_common::Result<Vec<_>>>()?;
938 let key_value_metadata = value
939 .key_value_metadata
940 .iter()
941 .filter_map(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone())))
942 .collect::<HashMap<String, String>>();
943 Ok(protobuf::TableParquetOptions {
944 global: Some((&value.global).try_into()?),
945 column_specific_options,
946 key_value_metadata,
947 })
948 }
949}
950
951impl TryFrom<&CsvOptions> for protobuf::CsvOptions {
952 type Error = DataFusionError; fn try_from(opts: &CsvOptions) -> datafusion_common::Result<Self, Self::Error> {
955 let compression: protobuf::CompressionTypeVariant = opts.compression.into();
956 Ok(protobuf::CsvOptions {
957 has_header: opts.has_header.map_or_else(Vec::new, |h| vec![h as u8]),
958 delimiter: vec![opts.delimiter],
959 quote: vec![opts.quote],
960 terminator: opts.terminator.map_or_else(Vec::new, |e| vec![e]),
961 escape: opts.escape.map_or_else(Vec::new, |e| vec![e]),
962 double_quote: opts.double_quote.map_or_else(Vec::new, |h| vec![h as u8]),
963 newlines_in_values: opts
964 .newlines_in_values
965 .map_or_else(Vec::new, |h| vec![h as u8]),
966 compression: compression.into(),
967 schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
968 date_format: opts.date_format.clone().unwrap_or_default(),
969 datetime_format: opts.datetime_format.clone().unwrap_or_default(),
970 timestamp_format: opts.timestamp_format.clone().unwrap_or_default(),
971 timestamp_tz_format: opts.timestamp_tz_format.clone().unwrap_or_default(),
972 time_format: opts.time_format.clone().unwrap_or_default(),
973 null_value: opts.null_value.clone().unwrap_or_default(),
974 null_regex: opts.null_regex.clone().unwrap_or_default(),
975 comment: opts.comment.map_or_else(Vec::new, |h| vec![h]),
976 truncated_rows: opts.truncated_rows.map_or_else(Vec::new, |h| vec![h as u8]),
977 })
978 }
979}
980
981impl TryFrom<&JsonOptions> for protobuf::JsonOptions {
982 type Error = DataFusionError;
983
984 fn try_from(opts: &JsonOptions) -> datafusion_common::Result<Self, Self::Error> {
985 let compression: protobuf::CompressionTypeVariant = opts.compression.into();
986 Ok(protobuf::JsonOptions {
987 compression: compression.into(),
988 schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
989 })
990 }
991}
992
993fn create_proto_scalar<I, T: FnOnce(&I) -> protobuf::scalar_value::Value>(
996 v: Option<&I>,
997 null_arrow_type: &DataType,
998 constructor: T,
999) -> Result<protobuf::ScalarValue, Error> {
1000 let value = v
1001 .map(constructor)
1002 .unwrap_or(protobuf::scalar_value::Value::NullValue(
1003 null_arrow_type.try_into()?,
1004 ));
1005
1006 Ok(protobuf::ScalarValue { value: Some(value) })
1007}
1008
1009fn encode_scalar_nested_value(
1012 arr: ArrayRef,
1013 val: &ScalarValue,
1014) -> Result<protobuf::ScalarValue, Error> {
1015 let batch = RecordBatch::try_from_iter(vec![("field_name", arr)]).map_err(|e| {
1016 Error::General(format!(
1017 "Error creating temporary batch while encoding ScalarValue::List: {e}"
1018 ))
1019 })?;
1020
1021 let gen = IpcDataGenerator {};
1022 let mut dict_tracker = DictionaryTracker::new(false);
1023 let write_options = IpcWriteOptions::default();
1024 let mut compression_context = CompressionContext::default();
1025 let (encoded_dictionaries, encoded_message) = gen
1026 .encode(
1027 &batch,
1028 &mut dict_tracker,
1029 &write_options,
1030 &mut compression_context,
1031 )
1032 .map_err(|e| {
1033 Error::General(format!("Error encoding ScalarValue::List as IPC: {e}"))
1034 })?;
1035
1036 let schema: protobuf::Schema = batch.schema().try_into()?;
1037
1038 let scalar_list_value = protobuf::ScalarNestedValue {
1039 ipc_message: encoded_message.ipc_message,
1040 arrow_data: encoded_message.arrow_data,
1041 dictionaries: encoded_dictionaries
1042 .into_iter()
1043 .map(|data| protobuf::scalar_nested_value::Dictionary {
1044 ipc_message: data.ipc_message,
1045 arrow_data: data.arrow_data,
1046 })
1047 .collect(),
1048 schema: Some(schema),
1049 };
1050
1051 match val {
1052 ScalarValue::List(_) => Ok(protobuf::ScalarValue {
1053 value: Some(protobuf::scalar_value::Value::ListValue(scalar_list_value)),
1054 }),
1055 ScalarValue::LargeList(_) => Ok(protobuf::ScalarValue {
1056 value: Some(protobuf::scalar_value::Value::LargeListValue(
1057 scalar_list_value,
1058 )),
1059 }),
1060 ScalarValue::FixedSizeList(_) => Ok(protobuf::ScalarValue {
1061 value: Some(protobuf::scalar_value::Value::FixedSizeListValue(
1062 scalar_list_value,
1063 )),
1064 }),
1065 ScalarValue::Struct(_) => Ok(protobuf::ScalarValue {
1066 value: Some(protobuf::scalar_value::Value::StructValue(
1067 scalar_list_value,
1068 )),
1069 }),
1070 ScalarValue::Map(_) => Ok(protobuf::ScalarValue {
1071 value: Some(protobuf::scalar_value::Value::MapValue(scalar_list_value)),
1072 }),
1073 _ => unreachable!(),
1074 }
1075}
1076
1077fn convert_arc_fields_to_proto_fields<'a, I>(
1079 fields: I,
1080) -> Result<Vec<protobuf::Field>, Error>
1081where
1082 I: IntoIterator<Item = &'a Arc<Field>>,
1083{
1084 fields
1085 .into_iter()
1086 .map(|field| field.as_ref().try_into())
1087 .collect::<Result<Vec<_>, Error>>()
1088}
1089
1090pub(crate) fn csv_writer_options_to_proto(
1091 csv_options: &WriterBuilder,
1092 compression: &CompressionTypeVariant,
1093) -> protobuf::CsvWriterOptions {
1094 let compression: protobuf::CompressionTypeVariant = compression.into();
1095 protobuf::CsvWriterOptions {
1096 compression: compression.into(),
1097 delimiter: (csv_options.delimiter() as char).to_string(),
1098 has_header: csv_options.header(),
1099 date_format: csv_options.date_format().unwrap_or("").to_owned(),
1100 datetime_format: csv_options.datetime_format().unwrap_or("").to_owned(),
1101 timestamp_format: csv_options.timestamp_format().unwrap_or("").to_owned(),
1102 time_format: csv_options.time_format().unwrap_or("").to_owned(),
1103 null_value: csv_options.null().to_owned(),
1104 quote: (csv_options.quote() as char).to_string(),
1105 escape: (csv_options.escape() as char).to_string(),
1106 double_quote: csv_options.double_quote(),
1107 }
1108}