1use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::protobuf_common as protobuf;
22use crate::protobuf_common::{
23 EmptyMessage, arrow_type::ArrowTypeEnum, scalar_value::Value,
24};
25use arrow::array::{ArrayRef, RecordBatch};
26use arrow::csv::WriterBuilder;
27use arrow::datatypes::{
28 DataType, Field, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, Schema,
29 SchemaRef, TimeUnit, UnionMode,
30};
31use arrow::ipc::writer::{
32 CompressionContext, DictionaryTracker, IpcDataGenerator, IpcWriteOptions,
33};
34use datafusion_common::{
35 Column, ColumnStatistics, Constraint, Constraints, DFSchema, DFSchemaRef,
36 DataFusionError, JoinSide, ScalarValue, Statistics,
37 config::{
38 CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
39 TableParquetOptions,
40 },
41 file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
42 parsers::CompressionTypeVariant,
43 plan_datafusion_err,
44 stats::Precision,
45};
46
47#[derive(Debug)]
48pub enum Error {
49 General(String),
50
51 InvalidScalarValue(ScalarValue),
52
53 InvalidScalarType(DataType),
54
55 InvalidTimeUnit(TimeUnit),
56
57 NotImplemented(String),
58}
59
60impl std::error::Error for Error {}
61
62impl std::fmt::Display for Error {
63 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
64 match self {
65 Self::General(desc) => write!(f, "General error: {desc}"),
66 Self::InvalidScalarValue(value) => {
67 write!(f, "{value:?} is invalid as a DataFusion scalar value")
68 }
69 Self::InvalidScalarType(data_type) => {
70 write!(f, "{data_type} is invalid as a DataFusion scalar type")
71 }
72 Self::InvalidTimeUnit(time_unit) => {
73 write!(
74 f,
75 "Only TimeUnit::Microsecond and TimeUnit::Nanosecond are valid time units, found: {time_unit:?}"
76 )
77 }
78 Self::NotImplemented(s) => {
79 write!(f, "Not implemented: {s}")
80 }
81 }
82 }
83}
84
85impl From<Error> for DataFusionError {
86 fn from(e: Error) -> Self {
87 plan_datafusion_err!("{}", e)
88 }
89}
90
91impl TryFrom<&Field> for protobuf::Field {
92 type Error = Error;
93
94 fn try_from(field: &Field) -> Result<Self, Self::Error> {
95 let arrow_type = field.data_type().try_into()?;
96 Ok(Self {
97 name: field.name().to_owned(),
98 arrow_type: Some(Box::new(arrow_type)),
99 nullable: field.is_nullable(),
100 children: Vec::new(),
101 metadata: field.metadata().clone(),
102 })
103 }
104}
105
106impl TryFrom<&DataType> for protobuf::ArrowType {
107 type Error = Error;
108
109 fn try_from(val: &DataType) -> Result<Self, Self::Error> {
110 let arrow_type_enum: ArrowTypeEnum = val.try_into()?;
111 Ok(Self {
112 arrow_type_enum: Some(arrow_type_enum),
113 })
114 }
115}
116
117impl TryFrom<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
118 type Error = Error;
119
120 fn try_from(val: &DataType) -> Result<Self, Self::Error> {
121 let res = match val {
122 DataType::Null => Self::None(EmptyMessage {}),
123 DataType::Boolean => Self::Bool(EmptyMessage {}),
124 DataType::Int8 => Self::Int8(EmptyMessage {}),
125 DataType::Int16 => Self::Int16(EmptyMessage {}),
126 DataType::Int32 => Self::Int32(EmptyMessage {}),
127 DataType::Int64 => Self::Int64(EmptyMessage {}),
128 DataType::UInt8 => Self::Uint8(EmptyMessage {}),
129 DataType::UInt16 => Self::Uint16(EmptyMessage {}),
130 DataType::UInt32 => Self::Uint32(EmptyMessage {}),
131 DataType::UInt64 => Self::Uint64(EmptyMessage {}),
132 DataType::Float16 => Self::Float16(EmptyMessage {}),
133 DataType::Float32 => Self::Float32(EmptyMessage {}),
134 DataType::Float64 => Self::Float64(EmptyMessage {}),
135 DataType::Timestamp(time_unit, timezone) => {
136 Self::Timestamp(protobuf::Timestamp {
137 time_unit: protobuf::TimeUnit::from(time_unit) as i32,
138 timezone: timezone.as_deref().unwrap_or("").to_string(),
139 })
140 }
141 DataType::Date32 => Self::Date32(EmptyMessage {}),
142 DataType::Date64 => Self::Date64(EmptyMessage {}),
143 DataType::Time32(time_unit) => {
144 Self::Time32(protobuf::TimeUnit::from(time_unit) as i32)
145 }
146 DataType::Time64(time_unit) => {
147 Self::Time64(protobuf::TimeUnit::from(time_unit) as i32)
148 }
149 DataType::Duration(time_unit) => {
150 Self::Duration(protobuf::TimeUnit::from(time_unit) as i32)
151 }
152 DataType::Interval(interval_unit) => {
153 Self::Interval(protobuf::IntervalUnit::from(interval_unit) as i32)
154 }
155 DataType::Binary => Self::Binary(EmptyMessage {}),
156 DataType::BinaryView => Self::BinaryView(EmptyMessage {}),
157 DataType::FixedSizeBinary(size) => Self::FixedSizeBinary(*size),
158 DataType::LargeBinary => Self::LargeBinary(EmptyMessage {}),
159 DataType::Utf8 => Self::Utf8(EmptyMessage {}),
160 DataType::Utf8View => Self::Utf8View(EmptyMessage {}),
161 DataType::LargeUtf8 => Self::LargeUtf8(EmptyMessage {}),
162 DataType::List(item_type) => Self::List(Box::new(protobuf::List {
163 field_type: Some(Box::new(item_type.as_ref().try_into()?)),
164 })),
165 DataType::FixedSizeList(item_type, size) => {
166 Self::FixedSizeList(Box::new(protobuf::FixedSizeList {
167 field_type: Some(Box::new(item_type.as_ref().try_into()?)),
168 list_size: *size,
169 }))
170 }
171 DataType::LargeList(item_type) => Self::LargeList(Box::new(protobuf::List {
172 field_type: Some(Box::new(item_type.as_ref().try_into()?)),
173 })),
174 DataType::Struct(struct_fields) => Self::Struct(protobuf::Struct {
175 sub_field_types: convert_arc_fields_to_proto_fields(struct_fields)?,
176 }),
177 DataType::Union(fields, union_mode) => {
178 let union_mode = match union_mode {
179 UnionMode::Sparse => protobuf::UnionMode::Sparse,
180 UnionMode::Dense => protobuf::UnionMode::Dense,
181 };
182 Self::Union(protobuf::Union {
183 union_types: convert_arc_fields_to_proto_fields(
184 fields.iter().map(|(_, item)| item),
185 )?,
186 union_mode: union_mode.into(),
187 type_ids: fields.iter().map(|(x, _)| x as i32).collect(),
188 })
189 }
190 DataType::Dictionary(key_type, value_type) => {
191 Self::Dictionary(Box::new(protobuf::Dictionary {
192 key: Some(Box::new(key_type.as_ref().try_into()?)),
193 value: Some(Box::new(value_type.as_ref().try_into()?)),
194 }))
195 }
196 DataType::Decimal32(precision, scale) => {
197 Self::Decimal32(protobuf::Decimal32Type {
198 precision: *precision as u32,
199 scale: *scale as i32,
200 })
201 }
202 DataType::Decimal64(precision, scale) => {
203 Self::Decimal64(protobuf::Decimal64Type {
204 precision: *precision as u32,
205 scale: *scale as i32,
206 })
207 }
208 DataType::Decimal128(precision, scale) => {
209 Self::Decimal128(protobuf::Decimal128Type {
210 precision: *precision as u32,
211 scale: *scale as i32,
212 })
213 }
214 DataType::Decimal256(precision, scale) => {
215 Self::Decimal256(protobuf::Decimal256Type {
216 precision: *precision as u32,
217 scale: *scale as i32,
218 })
219 }
220 DataType::Map(field, sorted) => Self::Map(Box::new(protobuf::Map {
221 field_type: Some(Box::new(field.as_ref().try_into()?)),
222 keys_sorted: *sorted,
223 })),
224 DataType::RunEndEncoded(run_ends_field, values_field) => {
225 Self::RunEndEncoded(Box::new(protobuf::RunEndEncoded {
226 run_ends_field: Some(Box::new(run_ends_field.as_ref().try_into()?)),
227 values_field: Some(Box::new(values_field.as_ref().try_into()?)),
228 }))
229 }
230 DataType::ListView(_) | DataType::LargeListView(_) => {
231 return Err(Error::General(format!(
232 "Proto serialization error: {val} not yet supported"
233 )));
234 }
235 };
236
237 Ok(res)
238 }
239}
240
241impl From<Column> for protobuf::Column {
242 fn from(c: Column) -> Self {
243 Self {
244 relation: c.relation.map(|relation| protobuf::ColumnRelation {
245 relation: relation.to_string(),
246 }),
247 name: c.name,
248 }
249 }
250}
251
252impl From<&Column> for protobuf::Column {
253 fn from(c: &Column) -> Self {
254 c.clone().into()
255 }
256}
257
258impl TryFrom<&Schema> for protobuf::Schema {
259 type Error = Error;
260
261 fn try_from(schema: &Schema) -> Result<Self, Self::Error> {
262 Ok(Self {
263 columns: convert_arc_fields_to_proto_fields(schema.fields())?,
264 metadata: schema.metadata.clone(),
265 })
266 }
267}
268
269impl TryFrom<SchemaRef> for protobuf::Schema {
270 type Error = Error;
271
272 fn try_from(schema: SchemaRef) -> Result<Self, Self::Error> {
273 Ok(Self {
274 columns: convert_arc_fields_to_proto_fields(schema.fields())?,
275 metadata: schema.metadata.clone(),
276 })
277 }
278}
279
280impl TryFrom<&DFSchema> for protobuf::DfSchema {
281 type Error = Error;
282
283 fn try_from(s: &DFSchema) -> Result<Self, Self::Error> {
284 let columns = s
285 .iter()
286 .map(|(qualifier, field)| {
287 Ok(protobuf::DfField {
288 field: Some(field.as_ref().try_into()?),
289 qualifier: qualifier.map(|r| protobuf::ColumnRelation {
290 relation: r.to_string(),
291 }),
292 })
293 })
294 .collect::<Result<Vec<_>, Error>>()?;
295 Ok(Self {
296 columns,
297 metadata: s.metadata().clone(),
298 })
299 }
300}
301
302impl TryFrom<&DFSchemaRef> for protobuf::DfSchema {
303 type Error = Error;
304
305 fn try_from(s: &DFSchemaRef) -> Result<Self, Self::Error> {
306 s.as_ref().try_into()
307 }
308}
309
310impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
311 type Error = Error;
312
313 fn try_from(val: &ScalarValue) -> Result<Self, Self::Error> {
314 let data_type = val.data_type();
315 match val {
316 ScalarValue::Boolean(val) => {
317 create_proto_scalar(val.as_ref(), &data_type, |s| Value::BoolValue(*s))
318 }
319 ScalarValue::Float16(val) => {
320 create_proto_scalar(val.as_ref(), &data_type, |s| {
321 Value::Float32Value((*s).into())
322 })
323 }
324 ScalarValue::Float32(val) => {
325 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Float32Value(*s))
326 }
327 ScalarValue::Float64(val) => {
328 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Float64Value(*s))
329 }
330 ScalarValue::Int8(val) => {
331 create_proto_scalar(val.as_ref(), &data_type, |s| {
332 Value::Int8Value(*s as i32)
333 })
334 }
335 ScalarValue::Int16(val) => {
336 create_proto_scalar(val.as_ref(), &data_type, |s| {
337 Value::Int16Value(*s as i32)
338 })
339 }
340 ScalarValue::Int32(val) => {
341 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Int32Value(*s))
342 }
343 ScalarValue::Int64(val) => {
344 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Int64Value(*s))
345 }
346 ScalarValue::UInt8(val) => {
347 create_proto_scalar(val.as_ref(), &data_type, |s| {
348 Value::Uint8Value(*s as u32)
349 })
350 }
351 ScalarValue::UInt16(val) => {
352 create_proto_scalar(val.as_ref(), &data_type, |s| {
353 Value::Uint16Value(*s as u32)
354 })
355 }
356 ScalarValue::UInt32(val) => {
357 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Uint32Value(*s))
358 }
359 ScalarValue::UInt64(val) => {
360 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Uint64Value(*s))
361 }
362 ScalarValue::Utf8(val) => {
363 create_proto_scalar(val.as_ref(), &data_type, |s| {
364 Value::Utf8Value(s.to_owned())
365 })
366 }
367 ScalarValue::LargeUtf8(val) => {
368 create_proto_scalar(val.as_ref(), &data_type, |s| {
369 Value::LargeUtf8Value(s.to_owned())
370 })
371 }
372 ScalarValue::Utf8View(val) => {
373 create_proto_scalar(val.as_ref(), &data_type, |s| {
374 Value::Utf8ViewValue(s.to_owned())
375 })
376 }
377 ScalarValue::List(arr) => {
378 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
379 }
380 ScalarValue::LargeList(arr) => {
381 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
382 }
383 ScalarValue::FixedSizeList(arr) => {
384 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
385 }
386 ScalarValue::Struct(arr) => {
387 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
388 }
389 ScalarValue::Map(arr) => {
390 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
391 }
392 ScalarValue::Date32(val) => {
393 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Date32Value(*s))
394 }
395 ScalarValue::TimestampMicrosecond(val, tz) => {
396 create_proto_scalar(val.as_ref(), &data_type, |s| {
397 Value::TimestampValue(protobuf::ScalarTimestampValue {
398 timezone: tz.as_deref().unwrap_or("").to_string(),
399 value: Some(
400 protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(
401 *s,
402 ),
403 ),
404 })
405 })
406 }
407 ScalarValue::TimestampNanosecond(val, tz) => {
408 create_proto_scalar(val.as_ref(), &data_type, |s| {
409 Value::TimestampValue(protobuf::ScalarTimestampValue {
410 timezone: tz.as_deref().unwrap_or("").to_string(),
411 value: Some(
412 protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(
413 *s,
414 ),
415 ),
416 })
417 })
418 }
419 ScalarValue::Decimal32(val, p, s) => match *val {
420 Some(v) => {
421 let array = v.to_be_bytes();
422 let vec_val: Vec<u8> = array.to_vec();
423 Ok(protobuf::ScalarValue {
424 value: Some(Value::Decimal32Value(protobuf::Decimal32 {
425 value: vec_val,
426 p: *p as i64,
427 s: *s as i64,
428 })),
429 })
430 }
431 None => Ok(protobuf::ScalarValue {
432 value: Some(protobuf::scalar_value::Value::NullValue(
433 (&data_type).try_into()?,
434 )),
435 }),
436 },
437 ScalarValue::Decimal64(val, p, s) => match *val {
438 Some(v) => {
439 let array = v.to_be_bytes();
440 let vec_val: Vec<u8> = array.to_vec();
441 Ok(protobuf::ScalarValue {
442 value: Some(Value::Decimal64Value(protobuf::Decimal64 {
443 value: vec_val,
444 p: *p as i64,
445 s: *s as i64,
446 })),
447 })
448 }
449 None => Ok(protobuf::ScalarValue {
450 value: Some(protobuf::scalar_value::Value::NullValue(
451 (&data_type).try_into()?,
452 )),
453 }),
454 },
455 ScalarValue::Decimal128(val, p, s) => match *val {
456 Some(v) => {
457 let array = v.to_be_bytes();
458 let vec_val: Vec<u8> = array.to_vec();
459 Ok(protobuf::ScalarValue {
460 value: Some(Value::Decimal128Value(protobuf::Decimal128 {
461 value: vec_val,
462 p: *p as i64,
463 s: *s as i64,
464 })),
465 })
466 }
467 None => Ok(protobuf::ScalarValue {
468 value: Some(protobuf::scalar_value::Value::NullValue(
469 (&data_type).try_into()?,
470 )),
471 }),
472 },
473 ScalarValue::Decimal256(val, p, s) => match *val {
474 Some(v) => {
475 let array = v.to_be_bytes();
476 let vec_val: Vec<u8> = array.to_vec();
477 Ok(protobuf::ScalarValue {
478 value: Some(Value::Decimal256Value(protobuf::Decimal256 {
479 value: vec_val,
480 p: *p as i64,
481 s: *s as i64,
482 })),
483 })
484 }
485 None => Ok(protobuf::ScalarValue {
486 value: Some(protobuf::scalar_value::Value::NullValue(
487 (&data_type).try_into()?,
488 )),
489 }),
490 },
491 ScalarValue::Date64(val) => {
492 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Date64Value(*s))
493 }
494 ScalarValue::TimestampSecond(val, tz) => {
495 create_proto_scalar(val.as_ref(), &data_type, |s| {
496 Value::TimestampValue(protobuf::ScalarTimestampValue {
497 timezone: tz.as_deref().unwrap_or("").to_string(),
498 value: Some(
499 protobuf::scalar_timestamp_value::Value::TimeSecondValue(*s),
500 ),
501 })
502 })
503 }
504 ScalarValue::TimestampMillisecond(val, tz) => {
505 create_proto_scalar(val.as_ref(), &data_type, |s| {
506 Value::TimestampValue(protobuf::ScalarTimestampValue {
507 timezone: tz.as_deref().unwrap_or("").to_string(),
508 value: Some(
509 protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(
510 *s,
511 ),
512 ),
513 })
514 })
515 }
516 ScalarValue::IntervalYearMonth(val) => {
517 create_proto_scalar(val.as_ref(), &data_type, |s| {
518 Value::IntervalYearmonthValue(*s)
519 })
520 }
521 ScalarValue::Null => Ok(protobuf::ScalarValue {
522 value: Some(Value::NullValue((&data_type).try_into()?)),
523 }),
524
525 ScalarValue::Binary(val) => {
526 create_proto_scalar(val.as_ref(), &data_type, |s| {
527 Value::BinaryValue(s.to_owned())
528 })
529 }
530 ScalarValue::BinaryView(val) => {
531 create_proto_scalar(val.as_ref(), &data_type, |s| {
532 Value::BinaryViewValue(s.to_owned())
533 })
534 }
535 ScalarValue::LargeBinary(val) => {
536 create_proto_scalar(val.as_ref(), &data_type, |s| {
537 Value::LargeBinaryValue(s.to_owned())
538 })
539 }
540 ScalarValue::FixedSizeBinary(length, val) => {
541 create_proto_scalar(val.as_ref(), &data_type, |s| {
542 Value::FixedSizeBinaryValue(protobuf::ScalarFixedSizeBinary {
543 values: s.to_owned(),
544 length: *length,
545 })
546 })
547 }
548
549 ScalarValue::Time32Second(v) => {
550 create_proto_scalar(v.as_ref(), &data_type, |v| {
551 Value::Time32Value(protobuf::ScalarTime32Value {
552 value: Some(
553 protobuf::scalar_time32_value::Value::Time32SecondValue(*v),
554 ),
555 })
556 })
557 }
558
559 ScalarValue::Time32Millisecond(v) => {
560 create_proto_scalar(v.as_ref(), &data_type, |v| {
561 Value::Time32Value(protobuf::ScalarTime32Value {
562 value: Some(
563 protobuf::scalar_time32_value::Value::Time32MillisecondValue(
564 *v,
565 ),
566 ),
567 })
568 })
569 }
570
571 ScalarValue::Time64Microsecond(v) => {
572 create_proto_scalar(v.as_ref(), &data_type, |v| {
573 Value::Time64Value(protobuf::ScalarTime64Value {
574 value: Some(
575 protobuf::scalar_time64_value::Value::Time64MicrosecondValue(
576 *v,
577 ),
578 ),
579 })
580 })
581 }
582
583 ScalarValue::Time64Nanosecond(v) => {
584 create_proto_scalar(v.as_ref(), &data_type, |v| {
585 Value::Time64Value(protobuf::ScalarTime64Value {
586 value: Some(
587 protobuf::scalar_time64_value::Value::Time64NanosecondValue(
588 *v,
589 ),
590 ),
591 })
592 })
593 }
594
595 ScalarValue::IntervalDayTime(val) => {
596 let value = if let Some(v) = val {
597 let (days, milliseconds) = IntervalDayTimeType::to_parts(*v);
598 Value::IntervalDaytimeValue(protobuf::IntervalDayTimeValue {
599 days,
600 milliseconds,
601 })
602 } else {
603 Value::NullValue((&data_type).try_into()?)
604 };
605
606 Ok(protobuf::ScalarValue { value: Some(value) })
607 }
608
609 ScalarValue::IntervalMonthDayNano(v) => {
610 let value = if let Some(v) = v {
611 let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v);
612 Value::IntervalMonthDayNano(protobuf::IntervalMonthDayNanoValue {
613 months,
614 days,
615 nanos,
616 })
617 } else {
618 Value::NullValue((&data_type).try_into()?)
619 };
620
621 Ok(protobuf::ScalarValue { value: Some(value) })
622 }
623
624 ScalarValue::DurationSecond(v) => {
625 let value = match v {
626 Some(v) => Value::DurationSecondValue(*v),
627 None => Value::NullValue((&data_type).try_into()?),
628 };
629 Ok(protobuf::ScalarValue { value: Some(value) })
630 }
631 ScalarValue::DurationMillisecond(v) => {
632 let value = match v {
633 Some(v) => Value::DurationMillisecondValue(*v),
634 None => Value::NullValue((&data_type).try_into()?),
635 };
636 Ok(protobuf::ScalarValue { value: Some(value) })
637 }
638 ScalarValue::DurationMicrosecond(v) => {
639 let value = match v {
640 Some(v) => Value::DurationMicrosecondValue(*v),
641 None => Value::NullValue((&data_type).try_into()?),
642 };
643 Ok(protobuf::ScalarValue { value: Some(value) })
644 }
645 ScalarValue::DurationNanosecond(v) => {
646 let value = match v {
647 Some(v) => Value::DurationNanosecondValue(*v),
648 None => Value::NullValue((&data_type).try_into()?),
649 };
650 Ok(protobuf::ScalarValue { value: Some(value) })
651 }
652
653 ScalarValue::Union(val, df_fields, mode) => {
654 let mut fields =
655 Vec::<protobuf::UnionField>::with_capacity(df_fields.len());
656 for (id, field) in df_fields.iter() {
657 let field_id = id as i32;
658 let field = Some(field.as_ref().try_into()?);
659 let field = protobuf::UnionField { field_id, field };
660 fields.push(field);
661 }
662 let mode = match mode {
663 UnionMode::Sparse => 0,
664 UnionMode::Dense => 1,
665 };
666 let value = match val {
667 None => None,
668 Some((_id, v)) => Some(Box::new(v.as_ref().try_into()?)),
669 };
670 let val = protobuf::UnionValue {
671 value_id: val.as_ref().map(|(id, _v)| *id as i32).unwrap_or(0),
672 value,
673 fields,
674 mode,
675 };
676 let val = Value::UnionValue(Box::new(val));
677 let val = protobuf::ScalarValue { value: Some(val) };
678 Ok(val)
679 }
680
681 ScalarValue::Dictionary(index_type, val) => {
682 let value: protobuf::ScalarValue = val.as_ref().try_into()?;
683 Ok(protobuf::ScalarValue {
684 value: Some(Value::DictionaryValue(Box::new(
685 protobuf::ScalarDictionaryValue {
686 index_type: Some(index_type.as_ref().try_into()?),
687 value: Some(Box::new(value)),
688 },
689 ))),
690 })
691 }
692
693 ScalarValue::RunEndEncoded(run_ends_field, values_field, val) => {
694 Ok(protobuf::ScalarValue {
695 value: Some(Value::RunEndEncodedValue(Box::new(
696 protobuf::ScalarRunEndEncodedValue {
697 run_ends_field: Some(run_ends_field.as_ref().try_into()?),
698 values_field: Some(values_field.as_ref().try_into()?),
699 value: Some(Box::new(val.as_ref().try_into()?)),
700 },
701 ))),
702 })
703 }
704 }
705 }
706}
707
708impl From<&TimeUnit> for protobuf::TimeUnit {
709 fn from(val: &TimeUnit) -> Self {
710 match val {
711 TimeUnit::Second => protobuf::TimeUnit::Second,
712 TimeUnit::Millisecond => protobuf::TimeUnit::Millisecond,
713 TimeUnit::Microsecond => protobuf::TimeUnit::Microsecond,
714 TimeUnit::Nanosecond => protobuf::TimeUnit::Nanosecond,
715 }
716 }
717}
718
719impl From<&IntervalUnit> for protobuf::IntervalUnit {
720 fn from(interval_unit: &IntervalUnit) -> Self {
721 match interval_unit {
722 IntervalUnit::YearMonth => protobuf::IntervalUnit::YearMonth,
723 IntervalUnit::DayTime => protobuf::IntervalUnit::DayTime,
724 IntervalUnit::MonthDayNano => protobuf::IntervalUnit::MonthDayNano,
725 }
726 }
727}
728
729impl From<Constraints> for protobuf::Constraints {
730 fn from(value: Constraints) -> Self {
731 let constraints = value.into_iter().map(|item| item.into()).collect();
732 protobuf::Constraints { constraints }
733 }
734}
735
736impl From<Constraint> for protobuf::Constraint {
737 fn from(value: Constraint) -> Self {
738 let res = match value {
739 Constraint::PrimaryKey(indices) => {
740 let indices = indices.into_iter().map(|item| item as u64).collect();
741 protobuf::constraint::ConstraintMode::PrimaryKey(
742 protobuf::PrimaryKeyConstraint { indices },
743 )
744 }
745 Constraint::Unique(indices) => {
746 let indices = indices.into_iter().map(|item| item as u64).collect();
747 protobuf::constraint::ConstraintMode::PrimaryKey(
748 protobuf::PrimaryKeyConstraint { indices },
749 )
750 }
751 };
752 protobuf::Constraint {
753 constraint_mode: Some(res),
754 }
755 }
756}
757
758impl From<&Precision<usize>> for protobuf::Precision {
759 fn from(s: &Precision<usize>) -> protobuf::Precision {
760 match s {
761 Precision::Exact(val) => protobuf::Precision {
762 precision_info: protobuf::PrecisionInfo::Exact.into(),
763 val: Some(crate::protobuf_common::ScalarValue {
764 value: Some(Value::Uint64Value(*val as u64)),
765 }),
766 },
767 Precision::Inexact(val) => protobuf::Precision {
768 precision_info: protobuf::PrecisionInfo::Inexact.into(),
769 val: Some(crate::protobuf_common::ScalarValue {
770 value: Some(Value::Uint64Value(*val as u64)),
771 }),
772 },
773 Precision::Absent => protobuf::Precision {
774 precision_info: protobuf::PrecisionInfo::Absent.into(),
775 val: Some(crate::protobuf_common::ScalarValue { value: None }),
776 },
777 }
778 }
779}
780
781impl From<&Precision<datafusion_common::ScalarValue>> for protobuf::Precision {
782 fn from(s: &Precision<datafusion_common::ScalarValue>) -> protobuf::Precision {
783 match s {
784 Precision::Exact(val) => protobuf::Precision {
785 precision_info: protobuf::PrecisionInfo::Exact.into(),
786 val: val.try_into().ok(),
787 },
788 Precision::Inexact(val) => protobuf::Precision {
789 precision_info: protobuf::PrecisionInfo::Inexact.into(),
790 val: val.try_into().ok(),
791 },
792 Precision::Absent => protobuf::Precision {
793 precision_info: protobuf::PrecisionInfo::Absent.into(),
794 val: Some(crate::protobuf_common::ScalarValue { value: None }),
795 },
796 }
797 }
798}
799
800impl From<&Statistics> for protobuf::Statistics {
801 fn from(s: &Statistics) -> protobuf::Statistics {
802 let column_stats = s.column_statistics.iter().map(|s| s.into()).collect();
803 protobuf::Statistics {
804 num_rows: Some(protobuf::Precision::from(&s.num_rows)),
805 total_byte_size: Some(protobuf::Precision::from(&s.total_byte_size)),
806 column_stats,
807 }
808 }
809}
810
811impl From<&ColumnStatistics> for protobuf::ColumnStats {
812 fn from(s: &ColumnStatistics) -> protobuf::ColumnStats {
813 protobuf::ColumnStats {
814 min_value: Some(protobuf::Precision::from(&s.min_value)),
815 max_value: Some(protobuf::Precision::from(&s.max_value)),
816 sum_value: Some(protobuf::Precision::from(&s.sum_value)),
817 null_count: Some(protobuf::Precision::from(&s.null_count)),
818 distinct_count: Some(protobuf::Precision::from(&s.distinct_count)),
819 byte_size: Some(protobuf::Precision::from(&s.byte_size)),
820 }
821 }
822}
823
824impl From<JoinSide> for protobuf::JoinSide {
825 fn from(t: JoinSide) -> Self {
826 match t {
827 JoinSide::Left => protobuf::JoinSide::LeftSide,
828 JoinSide::Right => protobuf::JoinSide::RightSide,
829 JoinSide::None => protobuf::JoinSide::None,
830 }
831 }
832}
833
834impl From<&CompressionTypeVariant> for protobuf::CompressionTypeVariant {
835 fn from(value: &CompressionTypeVariant) -> Self {
836 match value {
837 CompressionTypeVariant::GZIP => Self::Gzip,
838 CompressionTypeVariant::BZIP2 => Self::Bzip2,
839 CompressionTypeVariant::XZ => Self::Xz,
840 CompressionTypeVariant::ZSTD => Self::Zstd,
841 CompressionTypeVariant::UNCOMPRESSED => Self::Uncompressed,
842 }
843 }
844}
845
846impl TryFrom<&CsvWriterOptions> for protobuf::CsvWriterOptions {
847 type Error = DataFusionError;
848
849 fn try_from(opts: &CsvWriterOptions) -> datafusion_common::Result<Self, Self::Error> {
850 Ok(csv_writer_options_to_proto(
851 &opts.writer_options,
852 &opts.compression,
853 ))
854 }
855}
856
857impl TryFrom<&JsonWriterOptions> for protobuf::JsonWriterOptions {
858 type Error = DataFusionError;
859
860 fn try_from(
861 opts: &JsonWriterOptions,
862 ) -> datafusion_common::Result<Self, Self::Error> {
863 let compression: protobuf::CompressionTypeVariant = opts.compression.into();
864 Ok(protobuf::JsonWriterOptions {
865 compression: compression.into(),
866 })
867 }
868}
869
870impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
871 type Error = DataFusionError;
872
873 fn try_from(value: &ParquetOptions) -> datafusion_common::Result<Self, Self::Error> {
874 Ok(protobuf::ParquetOptions {
875 enable_page_index: value.enable_page_index,
876 pruning: value.pruning,
877 skip_metadata: value.skip_metadata,
878 metadata_size_hint_opt: value.metadata_size_hint.map(|v| protobuf::parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v as u64)),
879 pushdown_filters: value.pushdown_filters,
880 reorder_filters: value.reorder_filters,
881 force_filter_selections: value.force_filter_selections,
882 data_pagesize_limit: value.data_pagesize_limit as u64,
883 write_batch_size: value.write_batch_size as u64,
884 writer_version: value.writer_version.to_string(),
885 compression_opt: value.compression.clone().map(protobuf::parquet_options::CompressionOpt::Compression),
886 dictionary_enabled_opt: value.dictionary_enabled.map(protobuf::parquet_options::DictionaryEnabledOpt::DictionaryEnabled),
887 dictionary_page_size_limit: value.dictionary_page_size_limit as u64,
888 statistics_enabled_opt: value.statistics_enabled.clone().map(protobuf::parquet_options::StatisticsEnabledOpt::StatisticsEnabled),
889 max_row_group_size: value.max_row_group_size as u64,
890 created_by: value.created_by.clone(),
891 column_index_truncate_length_opt: value.column_index_truncate_length.map(|v| protobuf::parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v as u64)),
892 statistics_truncate_length_opt: value.statistics_truncate_length.map(|v| protobuf::parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v as u64)),
893 data_page_row_count_limit: value.data_page_row_count_limit as u64,
894 encoding_opt: value.encoding.clone().map(protobuf::parquet_options::EncodingOpt::Encoding),
895 bloom_filter_on_read: value.bloom_filter_on_read,
896 bloom_filter_on_write: value.bloom_filter_on_write,
897 bloom_filter_fpp_opt: value.bloom_filter_fpp.map(protobuf::parquet_options::BloomFilterFppOpt::BloomFilterFpp),
898 bloom_filter_ndv_opt: value.bloom_filter_ndv.map(protobuf::parquet_options::BloomFilterNdvOpt::BloomFilterNdv),
899 allow_single_file_parallelism: value.allow_single_file_parallelism,
900 maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as u64,
901 maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as u64,
902 schema_force_view_types: value.schema_force_view_types,
903 binary_as_string: value.binary_as_string,
904 skip_arrow_metadata: value.skip_arrow_metadata,
905 coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96),
906 max_predicate_cache_size_opt: value.max_predicate_cache_size.map(|v| protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as u64)),
907 })
908 }
909}
910
911impl TryFrom<&ParquetColumnOptions> for protobuf::ParquetColumnOptions {
912 type Error = DataFusionError;
913
914 fn try_from(
915 value: &ParquetColumnOptions,
916 ) -> datafusion_common::Result<Self, Self::Error> {
917 Ok(protobuf::ParquetColumnOptions {
918 compression_opt: value
919 .compression
920 .clone()
921 .map(protobuf::parquet_column_options::CompressionOpt::Compression),
922 dictionary_enabled_opt: value
923 .dictionary_enabled
924 .map(protobuf::parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled),
925 statistics_enabled_opt: value
926 .statistics_enabled
927 .clone()
928 .map(protobuf::parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled),
929 encoding_opt: value
930 .encoding
931 .clone()
932 .map(protobuf::parquet_column_options::EncodingOpt::Encoding),
933 bloom_filter_enabled_opt: value
934 .bloom_filter_enabled
935 .map(protobuf::parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled),
936 bloom_filter_fpp_opt: value
937 .bloom_filter_fpp
938 .map(protobuf::parquet_column_options::BloomFilterFppOpt::BloomFilterFpp),
939 bloom_filter_ndv_opt: value
940 .bloom_filter_ndv
941 .map(protobuf::parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv),
942 })
943 }
944}
945
946impl TryFrom<&TableParquetOptions> for protobuf::TableParquetOptions {
947 type Error = DataFusionError;
948 fn try_from(
949 value: &TableParquetOptions,
950 ) -> datafusion_common::Result<Self, Self::Error> {
951 let column_specific_options = value
952 .column_specific_options
953 .iter()
954 .map(|(k, v)| {
955 Ok(protobuf::ParquetColumnSpecificOptions {
956 column_name: k.into(),
957 options: Some(v.try_into()?),
958 })
959 })
960 .collect::<datafusion_common::Result<Vec<_>>>()?;
961 let key_value_metadata = value
962 .key_value_metadata
963 .iter()
964 .filter_map(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone())))
965 .collect::<HashMap<String, String>>();
966 Ok(protobuf::TableParquetOptions {
967 global: Some((&value.global).try_into()?),
968 column_specific_options,
969 key_value_metadata,
970 })
971 }
972}
973
974impl TryFrom<&CsvOptions> for protobuf::CsvOptions {
975 type Error = DataFusionError; fn try_from(opts: &CsvOptions) -> datafusion_common::Result<Self, Self::Error> {
978 let compression: protobuf::CompressionTypeVariant = opts.compression.into();
979 Ok(protobuf::CsvOptions {
980 has_header: opts.has_header.map_or_else(Vec::new, |h| vec![h as u8]),
981 delimiter: vec![opts.delimiter],
982 quote: vec![opts.quote],
983 terminator: opts.terminator.map_or_else(Vec::new, |e| vec![e]),
984 escape: opts.escape.map_or_else(Vec::new, |e| vec![e]),
985 double_quote: opts.double_quote.map_or_else(Vec::new, |h| vec![h as u8]),
986 newlines_in_values: opts
987 .newlines_in_values
988 .map_or_else(Vec::new, |h| vec![h as u8]),
989 compression: compression.into(),
990 schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
991 date_format: opts.date_format.clone().unwrap_or_default(),
992 datetime_format: opts.datetime_format.clone().unwrap_or_default(),
993 timestamp_format: opts.timestamp_format.clone().unwrap_or_default(),
994 timestamp_tz_format: opts.timestamp_tz_format.clone().unwrap_or_default(),
995 time_format: opts.time_format.clone().unwrap_or_default(),
996 null_value: opts.null_value.clone().unwrap_or_default(),
997 null_regex: opts.null_regex.clone().unwrap_or_default(),
998 comment: opts.comment.map_or_else(Vec::new, |h| vec![h]),
999 truncated_rows: opts.truncated_rows.map_or_else(Vec::new, |h| vec![h as u8]),
1000 compression_level: opts.compression_level,
1001 })
1002 }
1003}
1004
1005impl TryFrom<&JsonOptions> for protobuf::JsonOptions {
1006 type Error = DataFusionError;
1007
1008 fn try_from(opts: &JsonOptions) -> datafusion_common::Result<Self, Self::Error> {
1009 let compression: protobuf::CompressionTypeVariant = opts.compression.into();
1010 Ok(protobuf::JsonOptions {
1011 compression: compression.into(),
1012 schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
1013 compression_level: opts.compression_level,
1014 newline_delimited: Some(opts.newline_delimited),
1015 })
1016 }
1017}
1018
1019fn create_proto_scalar<I, T: FnOnce(&I) -> protobuf::scalar_value::Value>(
1022 v: Option<&I>,
1023 null_arrow_type: &DataType,
1024 constructor: T,
1025) -> Result<protobuf::ScalarValue, Error> {
1026 let value = v
1027 .map(constructor)
1028 .unwrap_or(protobuf::scalar_value::Value::NullValue(
1029 null_arrow_type.try_into()?,
1030 ));
1031
1032 Ok(protobuf::ScalarValue { value: Some(value) })
1033}
1034
1035fn encode_scalar_nested_value(
1038 arr: ArrayRef,
1039 val: &ScalarValue,
1040) -> Result<protobuf::ScalarValue, Error> {
1041 let batch = RecordBatch::try_from_iter(vec![("field_name", arr)]).map_err(|e| {
1042 Error::General(format!(
1043 "Error creating temporary batch while encoding nested ScalarValue: {e}"
1044 ))
1045 })?;
1046
1047 let ipc_gen = IpcDataGenerator {};
1048 let mut dict_tracker = DictionaryTracker::new(false);
1049 let write_options = IpcWriteOptions::default();
1050 ipc_gen.schema_to_bytes_with_dictionary_tracker(
1053 batch.schema().as_ref(),
1054 &mut dict_tracker,
1055 &write_options,
1056 );
1057 let mut compression_context = CompressionContext::default();
1058 let (encoded_dictionaries, encoded_message) = ipc_gen
1059 .encode(
1060 &batch,
1061 &mut dict_tracker,
1062 &write_options,
1063 &mut compression_context,
1064 )
1065 .map_err(|e| {
1066 Error::General(format!("Error encoding nested ScalarValue as IPC: {e}"))
1067 })?;
1068
1069 let schema: protobuf::Schema = batch.schema().try_into()?;
1070
1071 let scalar_list_value = protobuf::ScalarNestedValue {
1072 ipc_message: encoded_message.ipc_message,
1073 arrow_data: encoded_message.arrow_data,
1074 dictionaries: encoded_dictionaries
1075 .into_iter()
1076 .map(|data| protobuf::scalar_nested_value::Dictionary {
1077 ipc_message: data.ipc_message,
1078 arrow_data: data.arrow_data,
1079 })
1080 .collect(),
1081 schema: Some(schema),
1082 };
1083
1084 match val {
1085 ScalarValue::List(_) => Ok(protobuf::ScalarValue {
1086 value: Some(protobuf::scalar_value::Value::ListValue(scalar_list_value)),
1087 }),
1088 ScalarValue::LargeList(_) => Ok(protobuf::ScalarValue {
1089 value: Some(protobuf::scalar_value::Value::LargeListValue(
1090 scalar_list_value,
1091 )),
1092 }),
1093 ScalarValue::FixedSizeList(_) => Ok(protobuf::ScalarValue {
1094 value: Some(protobuf::scalar_value::Value::FixedSizeListValue(
1095 scalar_list_value,
1096 )),
1097 }),
1098 ScalarValue::Struct(_) => Ok(protobuf::ScalarValue {
1099 value: Some(protobuf::scalar_value::Value::StructValue(
1100 scalar_list_value,
1101 )),
1102 }),
1103 ScalarValue::Map(_) => Ok(protobuf::ScalarValue {
1104 value: Some(protobuf::scalar_value::Value::MapValue(scalar_list_value)),
1105 }),
1106 _ => unreachable!(),
1107 }
1108}
1109
1110fn convert_arc_fields_to_proto_fields<'a, I>(
1112 fields: I,
1113) -> Result<Vec<protobuf::Field>, Error>
1114where
1115 I: IntoIterator<Item = &'a Arc<Field>>,
1116{
1117 fields
1118 .into_iter()
1119 .map(|field| field.as_ref().try_into())
1120 .collect::<Result<Vec<_>, Error>>()
1121}
1122
1123pub(crate) fn csv_writer_options_to_proto(
1124 csv_options: &WriterBuilder,
1125 compression: &CompressionTypeVariant,
1126) -> protobuf::CsvWriterOptions {
1127 let compression: protobuf::CompressionTypeVariant = compression.into();
1128 protobuf::CsvWriterOptions {
1129 compression: compression.into(),
1130 delimiter: (csv_options.delimiter() as char).to_string(),
1131 has_header: csv_options.header(),
1132 date_format: csv_options.date_format().unwrap_or("").to_owned(),
1133 datetime_format: csv_options.datetime_format().unwrap_or("").to_owned(),
1134 timestamp_format: csv_options.timestamp_format().unwrap_or("").to_owned(),
1135 time_format: csv_options.time_format().unwrap_or("").to_owned(),
1136 null_value: csv_options.null().to_owned(),
1137 quote: (csv_options.quote() as char).to_string(),
1138 escape: (csv_options.escape() as char).to_string(),
1139 double_quote: csv_options.double_quote(),
1140 }
1141}