1use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::protobuf_common as protobuf;
22use crate::protobuf_common::{
23 arrow_type::ArrowTypeEnum, scalar_value::Value, EmptyMessage,
24};
25use arrow::array::{ArrayRef, RecordBatch};
26use arrow::csv::WriterBuilder;
27use arrow::datatypes::{
28 DataType, Field, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, Schema,
29 SchemaRef, TimeUnit, UnionMode,
30};
31use arrow::ipc::writer::{DictionaryTracker, IpcDataGenerator};
32use datafusion_common::{
33 config::{
34 CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
35 TableParquetOptions,
36 },
37 file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
38 parsers::CompressionTypeVariant,
39 plan_datafusion_err,
40 stats::Precision,
41 Column, ColumnStatistics, Constraint, Constraints, DFSchema, DFSchemaRef,
42 DataFusionError, JoinSide, ScalarValue, Statistics,
43};
44
45#[derive(Debug)]
46pub enum Error {
47 General(String),
48
49 InvalidScalarValue(ScalarValue),
50
51 InvalidScalarType(DataType),
52
53 InvalidTimeUnit(TimeUnit),
54
55 NotImplemented(String),
56}
57
58impl std::error::Error for Error {}
59
60impl std::fmt::Display for Error {
61 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
62 match self {
63 Self::General(desc) => write!(f, "General error: {desc}"),
64 Self::InvalidScalarValue(value) => {
65 write!(f, "{value:?} is invalid as a DataFusion scalar value")
66 }
67 Self::InvalidScalarType(data_type) => {
68 write!(f, "{data_type:?} is invalid as a DataFusion scalar type")
69 }
70 Self::InvalidTimeUnit(time_unit) => {
71 write!(
72 f,
73 "Only TimeUnit::Microsecond and TimeUnit::Nanosecond are valid time units, found: {time_unit:?}"
74 )
75 }
76 Self::NotImplemented(s) => {
77 write!(f, "Not implemented: {s}")
78 }
79 }
80 }
81}
82
83impl From<Error> for DataFusionError {
84 fn from(e: Error) -> Self {
85 plan_datafusion_err!("{}", e)
86 }
87}
88
89impl TryFrom<&Field> for protobuf::Field {
90 type Error = Error;
91
92 fn try_from(field: &Field) -> Result<Self, Self::Error> {
93 let arrow_type = field.data_type().try_into()?;
94 Ok(Self {
95 name: field.name().to_owned(),
96 arrow_type: Some(Box::new(arrow_type)),
97 nullable: field.is_nullable(),
98 children: Vec::new(),
99 metadata: field.metadata().clone(),
100 })
101 }
102}
103
104impl TryFrom<&DataType> for protobuf::ArrowType {
105 type Error = Error;
106
107 fn try_from(val: &DataType) -> Result<Self, Self::Error> {
108 let arrow_type_enum: ArrowTypeEnum = val.try_into()?;
109 Ok(Self {
110 arrow_type_enum: Some(arrow_type_enum),
111 })
112 }
113}
114
115impl TryFrom<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
116 type Error = Error;
117
118 fn try_from(val: &DataType) -> Result<Self, Self::Error> {
119 let res = match val {
120 DataType::Null => Self::None(EmptyMessage {}),
121 DataType::Boolean => Self::Bool(EmptyMessage {}),
122 DataType::Int8 => Self::Int8(EmptyMessage {}),
123 DataType::Int16 => Self::Int16(EmptyMessage {}),
124 DataType::Int32 => Self::Int32(EmptyMessage {}),
125 DataType::Int64 => Self::Int64(EmptyMessage {}),
126 DataType::UInt8 => Self::Uint8(EmptyMessage {}),
127 DataType::UInt16 => Self::Uint16(EmptyMessage {}),
128 DataType::UInt32 => Self::Uint32(EmptyMessage {}),
129 DataType::UInt64 => Self::Uint64(EmptyMessage {}),
130 DataType::Float16 => Self::Float16(EmptyMessage {}),
131 DataType::Float32 => Self::Float32(EmptyMessage {}),
132 DataType::Float64 => Self::Float64(EmptyMessage {}),
133 DataType::Timestamp(time_unit, timezone) => {
134 Self::Timestamp(protobuf::Timestamp {
135 time_unit: protobuf::TimeUnit::from(time_unit) as i32,
136 timezone: timezone.as_deref().unwrap_or("").to_string(),
137 })
138 }
139 DataType::Date32 => Self::Date32(EmptyMessage {}),
140 DataType::Date64 => Self::Date64(EmptyMessage {}),
141 DataType::Time32(time_unit) => {
142 Self::Time32(protobuf::TimeUnit::from(time_unit) as i32)
143 }
144 DataType::Time64(time_unit) => {
145 Self::Time64(protobuf::TimeUnit::from(time_unit) as i32)
146 }
147 DataType::Duration(time_unit) => {
148 Self::Duration(protobuf::TimeUnit::from(time_unit) as i32)
149 }
150 DataType::Interval(interval_unit) => {
151 Self::Interval(protobuf::IntervalUnit::from(interval_unit) as i32)
152 }
153 DataType::Binary => Self::Binary(EmptyMessage {}),
154 DataType::BinaryView => Self::BinaryView(EmptyMessage {}),
155 DataType::FixedSizeBinary(size) => Self::FixedSizeBinary(*size),
156 DataType::LargeBinary => Self::LargeBinary(EmptyMessage {}),
157 DataType::Utf8 => Self::Utf8(EmptyMessage {}),
158 DataType::Utf8View => Self::Utf8View(EmptyMessage {}),
159 DataType::LargeUtf8 => Self::LargeUtf8(EmptyMessage {}),
160 DataType::List(item_type) => Self::List(Box::new(protobuf::List {
161 field_type: Some(Box::new(item_type.as_ref().try_into()?)),
162 })),
163 DataType::FixedSizeList(item_type, size) => {
164 Self::FixedSizeList(Box::new(protobuf::FixedSizeList {
165 field_type: Some(Box::new(item_type.as_ref().try_into()?)),
166 list_size: *size,
167 }))
168 }
169 DataType::LargeList(item_type) => Self::LargeList(Box::new(protobuf::List {
170 field_type: Some(Box::new(item_type.as_ref().try_into()?)),
171 })),
172 DataType::Struct(struct_fields) => Self::Struct(protobuf::Struct {
173 sub_field_types: convert_arc_fields_to_proto_fields(struct_fields)?,
174 }),
175 DataType::Union(fields, union_mode) => {
176 let union_mode = match union_mode {
177 UnionMode::Sparse => protobuf::UnionMode::Sparse,
178 UnionMode::Dense => protobuf::UnionMode::Dense,
179 };
180 Self::Union(protobuf::Union {
181 union_types: convert_arc_fields_to_proto_fields(fields.iter().map(|(_, item)|item))?,
182 union_mode: union_mode.into(),
183 type_ids: fields.iter().map(|(x, _)| x as i32).collect(),
184 })
185 }
186 DataType::Dictionary(key_type, value_type) => {
187 Self::Dictionary(Box::new(protobuf::Dictionary {
188 key: Some(Box::new(key_type.as_ref().try_into()?)),
189 value: Some(Box::new(value_type.as_ref().try_into()?)),
190 }))
191 }
192 DataType::Decimal32(precision, scale) => Self::Decimal32(protobuf::Decimal32Type {
193 precision: *precision as u32,
194 scale: *scale as i32,
195 }),
196 DataType::Decimal64(precision, scale) => Self::Decimal64(protobuf::Decimal64Type {
197 precision: *precision as u32,
198 scale: *scale as i32,
199 }),
200 DataType::Decimal128(precision, scale) => Self::Decimal128(protobuf::Decimal128Type {
201 precision: *precision as u32,
202 scale: *scale as i32,
203 }),
204 DataType::Decimal256(precision, scale) => Self::Decimal256(protobuf::Decimal256Type {
205 precision: *precision as u32,
206 scale: *scale as i32,
207 }),
208 DataType::Map(field, sorted) => {
209 Self::Map(Box::new(
210 protobuf::Map {
211 field_type: Some(Box::new(field.as_ref().try_into()?)),
212 keys_sorted: *sorted,
213 }
214 ))
215 }
216 DataType::RunEndEncoded(_, _) => {
217 return Err(Error::General(
218 "Proto serialization error: The RunEndEncoded data type is not yet supported".to_owned()
219 ))
220 }
221 DataType::ListView(_) | DataType::LargeListView(_) => {
222 return Err(Error::General(format!("Proto serialization error: {val} not yet supported")))
223 }
224 };
225
226 Ok(res)
227 }
228}
229
230impl From<Column> for protobuf::Column {
231 fn from(c: Column) -> Self {
232 Self {
233 relation: c.relation.map(|relation| protobuf::ColumnRelation {
234 relation: relation.to_string(),
235 }),
236 name: c.name,
237 }
238 }
239}
240
241impl From<&Column> for protobuf::Column {
242 fn from(c: &Column) -> Self {
243 c.clone().into()
244 }
245}
246
247impl TryFrom<&Schema> for protobuf::Schema {
248 type Error = Error;
249
250 fn try_from(schema: &Schema) -> Result<Self, Self::Error> {
251 Ok(Self {
252 columns: convert_arc_fields_to_proto_fields(schema.fields())?,
253 metadata: schema.metadata.clone(),
254 })
255 }
256}
257
258impl TryFrom<SchemaRef> for protobuf::Schema {
259 type Error = Error;
260
261 fn try_from(schema: SchemaRef) -> Result<Self, Self::Error> {
262 Ok(Self {
263 columns: convert_arc_fields_to_proto_fields(schema.fields())?,
264 metadata: schema.metadata.clone(),
265 })
266 }
267}
268
269impl TryFrom<&DFSchema> for protobuf::DfSchema {
270 type Error = Error;
271
272 fn try_from(s: &DFSchema) -> Result<Self, Self::Error> {
273 let columns = s
274 .iter()
275 .map(|(qualifier, field)| {
276 Ok(protobuf::DfField {
277 field: Some(field.as_ref().try_into()?),
278 qualifier: qualifier.map(|r| protobuf::ColumnRelation {
279 relation: r.to_string(),
280 }),
281 })
282 })
283 .collect::<Result<Vec<_>, Error>>()?;
284 Ok(Self {
285 columns,
286 metadata: s.metadata().clone(),
287 })
288 }
289}
290
291impl TryFrom<&DFSchemaRef> for protobuf::DfSchema {
292 type Error = Error;
293
294 fn try_from(s: &DFSchemaRef) -> Result<Self, Self::Error> {
295 s.as_ref().try_into()
296 }
297}
298
299impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
300 type Error = Error;
301
302 fn try_from(val: &ScalarValue) -> Result<Self, Self::Error> {
303 let data_type = val.data_type();
304 match val {
305 ScalarValue::Boolean(val) => {
306 create_proto_scalar(val.as_ref(), &data_type, |s| Value::BoolValue(*s))
307 }
308 ScalarValue::Float16(val) => {
309 create_proto_scalar(val.as_ref(), &data_type, |s| {
310 Value::Float32Value((*s).into())
311 })
312 }
313 ScalarValue::Float32(val) => {
314 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Float32Value(*s))
315 }
316 ScalarValue::Float64(val) => {
317 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Float64Value(*s))
318 }
319 ScalarValue::Int8(val) => {
320 create_proto_scalar(val.as_ref(), &data_type, |s| {
321 Value::Int8Value(*s as i32)
322 })
323 }
324 ScalarValue::Int16(val) => {
325 create_proto_scalar(val.as_ref(), &data_type, |s| {
326 Value::Int16Value(*s as i32)
327 })
328 }
329 ScalarValue::Int32(val) => {
330 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Int32Value(*s))
331 }
332 ScalarValue::Int64(val) => {
333 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Int64Value(*s))
334 }
335 ScalarValue::UInt8(val) => {
336 create_proto_scalar(val.as_ref(), &data_type, |s| {
337 Value::Uint8Value(*s as u32)
338 })
339 }
340 ScalarValue::UInt16(val) => {
341 create_proto_scalar(val.as_ref(), &data_type, |s| {
342 Value::Uint16Value(*s as u32)
343 })
344 }
345 ScalarValue::UInt32(val) => {
346 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Uint32Value(*s))
347 }
348 ScalarValue::UInt64(val) => {
349 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Uint64Value(*s))
350 }
351 ScalarValue::Utf8(val) => {
352 create_proto_scalar(val.as_ref(), &data_type, |s| {
353 Value::Utf8Value(s.to_owned())
354 })
355 }
356 ScalarValue::LargeUtf8(val) => {
357 create_proto_scalar(val.as_ref(), &data_type, |s| {
358 Value::LargeUtf8Value(s.to_owned())
359 })
360 }
361 ScalarValue::Utf8View(val) => {
362 create_proto_scalar(val.as_ref(), &data_type, |s| {
363 Value::Utf8ViewValue(s.to_owned())
364 })
365 }
366 ScalarValue::List(arr) => {
367 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
368 }
369 ScalarValue::LargeList(arr) => {
370 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
371 }
372 ScalarValue::FixedSizeList(arr) => {
373 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
374 }
375 ScalarValue::Struct(arr) => {
376 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
377 }
378 ScalarValue::Map(arr) => {
379 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
380 }
381 ScalarValue::Date32(val) => {
382 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Date32Value(*s))
383 }
384 ScalarValue::TimestampMicrosecond(val, tz) => {
385 create_proto_scalar(val.as_ref(), &data_type, |s| {
386 Value::TimestampValue(protobuf::ScalarTimestampValue {
387 timezone: tz.as_deref().unwrap_or("").to_string(),
388 value: Some(
389 protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(
390 *s,
391 ),
392 ),
393 })
394 })
395 }
396 ScalarValue::TimestampNanosecond(val, tz) => {
397 create_proto_scalar(val.as_ref(), &data_type, |s| {
398 Value::TimestampValue(protobuf::ScalarTimestampValue {
399 timezone: tz.as_deref().unwrap_or("").to_string(),
400 value: Some(
401 protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(
402 *s,
403 ),
404 ),
405 })
406 })
407 }
408 ScalarValue::Decimal128(val, p, s) => match *val {
409 Some(v) => {
410 let array = v.to_be_bytes();
411 let vec_val: Vec<u8> = array.to_vec();
412 Ok(protobuf::ScalarValue {
413 value: Some(Value::Decimal128Value(protobuf::Decimal128 {
414 value: vec_val,
415 p: *p as i64,
416 s: *s as i64,
417 })),
418 })
419 }
420 None => Ok(protobuf::ScalarValue {
421 value: Some(protobuf::scalar_value::Value::NullValue(
422 (&data_type).try_into()?,
423 )),
424 }),
425 },
426 ScalarValue::Decimal256(val, p, s) => match *val {
427 Some(v) => {
428 let array = v.to_be_bytes();
429 let vec_val: Vec<u8> = array.to_vec();
430 Ok(protobuf::ScalarValue {
431 value: Some(Value::Decimal256Value(protobuf::Decimal256 {
432 value: vec_val,
433 p: *p as i64,
434 s: *s as i64,
435 })),
436 })
437 }
438 None => Ok(protobuf::ScalarValue {
439 value: Some(protobuf::scalar_value::Value::NullValue(
440 (&data_type).try_into()?,
441 )),
442 }),
443 },
444 ScalarValue::Date64(val) => {
445 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Date64Value(*s))
446 }
447 ScalarValue::TimestampSecond(val, tz) => {
448 create_proto_scalar(val.as_ref(), &data_type, |s| {
449 Value::TimestampValue(protobuf::ScalarTimestampValue {
450 timezone: tz.as_deref().unwrap_or("").to_string(),
451 value: Some(
452 protobuf::scalar_timestamp_value::Value::TimeSecondValue(*s),
453 ),
454 })
455 })
456 }
457 ScalarValue::TimestampMillisecond(val, tz) => {
458 create_proto_scalar(val.as_ref(), &data_type, |s| {
459 Value::TimestampValue(protobuf::ScalarTimestampValue {
460 timezone: tz.as_deref().unwrap_or("").to_string(),
461 value: Some(
462 protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(
463 *s,
464 ),
465 ),
466 })
467 })
468 }
469 ScalarValue::IntervalYearMonth(val) => {
470 create_proto_scalar(val.as_ref(), &data_type, |s| {
471 Value::IntervalYearmonthValue(*s)
472 })
473 }
474 ScalarValue::Null => Ok(protobuf::ScalarValue {
475 value: Some(Value::NullValue((&data_type).try_into()?)),
476 }),
477
478 ScalarValue::Binary(val) => {
479 create_proto_scalar(val.as_ref(), &data_type, |s| {
480 Value::BinaryValue(s.to_owned())
481 })
482 }
483 ScalarValue::BinaryView(val) => {
484 create_proto_scalar(val.as_ref(), &data_type, |s| {
485 Value::BinaryViewValue(s.to_owned())
486 })
487 }
488 ScalarValue::LargeBinary(val) => {
489 create_proto_scalar(val.as_ref(), &data_type, |s| {
490 Value::LargeBinaryValue(s.to_owned())
491 })
492 }
493 ScalarValue::FixedSizeBinary(length, val) => {
494 create_proto_scalar(val.as_ref(), &data_type, |s| {
495 Value::FixedSizeBinaryValue(protobuf::ScalarFixedSizeBinary {
496 values: s.to_owned(),
497 length: *length,
498 })
499 })
500 }
501
502 ScalarValue::Time32Second(v) => {
503 create_proto_scalar(v.as_ref(), &data_type, |v| {
504 Value::Time32Value(protobuf::ScalarTime32Value {
505 value: Some(
506 protobuf::scalar_time32_value::Value::Time32SecondValue(*v),
507 ),
508 })
509 })
510 }
511
512 ScalarValue::Time32Millisecond(v) => {
513 create_proto_scalar(v.as_ref(), &data_type, |v| {
514 Value::Time32Value(protobuf::ScalarTime32Value {
515 value: Some(
516 protobuf::scalar_time32_value::Value::Time32MillisecondValue(
517 *v,
518 ),
519 ),
520 })
521 })
522 }
523
524 ScalarValue::Time64Microsecond(v) => {
525 create_proto_scalar(v.as_ref(), &data_type, |v| {
526 Value::Time64Value(protobuf::ScalarTime64Value {
527 value: Some(
528 protobuf::scalar_time64_value::Value::Time64MicrosecondValue(
529 *v,
530 ),
531 ),
532 })
533 })
534 }
535
536 ScalarValue::Time64Nanosecond(v) => {
537 create_proto_scalar(v.as_ref(), &data_type, |v| {
538 Value::Time64Value(protobuf::ScalarTime64Value {
539 value: Some(
540 protobuf::scalar_time64_value::Value::Time64NanosecondValue(
541 *v,
542 ),
543 ),
544 })
545 })
546 }
547
548 ScalarValue::IntervalDayTime(val) => {
549 let value = if let Some(v) = val {
550 let (days, milliseconds) = IntervalDayTimeType::to_parts(*v);
551 Value::IntervalDaytimeValue(protobuf::IntervalDayTimeValue {
552 days,
553 milliseconds,
554 })
555 } else {
556 Value::NullValue((&data_type).try_into()?)
557 };
558
559 Ok(protobuf::ScalarValue { value: Some(value) })
560 }
561
562 ScalarValue::IntervalMonthDayNano(v) => {
563 let value = if let Some(v) = v {
564 let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v);
565 Value::IntervalMonthDayNano(protobuf::IntervalMonthDayNanoValue {
566 months,
567 days,
568 nanos,
569 })
570 } else {
571 Value::NullValue((&data_type).try_into()?)
572 };
573
574 Ok(protobuf::ScalarValue { value: Some(value) })
575 }
576
577 ScalarValue::DurationSecond(v) => {
578 let value = match v {
579 Some(v) => Value::DurationSecondValue(*v),
580 None => Value::NullValue((&data_type).try_into()?),
581 };
582 Ok(protobuf::ScalarValue { value: Some(value) })
583 }
584 ScalarValue::DurationMillisecond(v) => {
585 let value = match v {
586 Some(v) => Value::DurationMillisecondValue(*v),
587 None => Value::NullValue((&data_type).try_into()?),
588 };
589 Ok(protobuf::ScalarValue { value: Some(value) })
590 }
591 ScalarValue::DurationMicrosecond(v) => {
592 let value = match v {
593 Some(v) => Value::DurationMicrosecondValue(*v),
594 None => Value::NullValue((&data_type).try_into()?),
595 };
596 Ok(protobuf::ScalarValue { value: Some(value) })
597 }
598 ScalarValue::DurationNanosecond(v) => {
599 let value = match v {
600 Some(v) => Value::DurationNanosecondValue(*v),
601 None => Value::NullValue((&data_type).try_into()?),
602 };
603 Ok(protobuf::ScalarValue { value: Some(value) })
604 }
605
606 ScalarValue::Union(val, df_fields, mode) => {
607 let mut fields =
608 Vec::<protobuf::UnionField>::with_capacity(df_fields.len());
609 for (id, field) in df_fields.iter() {
610 let field_id = id as i32;
611 let field = Some(field.as_ref().try_into()?);
612 let field = protobuf::UnionField { field_id, field };
613 fields.push(field);
614 }
615 let mode = match mode {
616 UnionMode::Sparse => 0,
617 UnionMode::Dense => 1,
618 };
619 let value = match val {
620 None => None,
621 Some((_id, v)) => Some(Box::new(v.as_ref().try_into()?)),
622 };
623 let val = protobuf::UnionValue {
624 value_id: val.as_ref().map(|(id, _v)| *id as i32).unwrap_or(0),
625 value,
626 fields,
627 mode,
628 };
629 let val = Value::UnionValue(Box::new(val));
630 let val = protobuf::ScalarValue { value: Some(val) };
631 Ok(val)
632 }
633
634 ScalarValue::Dictionary(index_type, val) => {
635 let value: protobuf::ScalarValue = val.as_ref().try_into()?;
636 Ok(protobuf::ScalarValue {
637 value: Some(Value::DictionaryValue(Box::new(
638 protobuf::ScalarDictionaryValue {
639 index_type: Some(index_type.as_ref().try_into()?),
640 value: Some(Box::new(value)),
641 },
642 ))),
643 })
644 }
645 }
646 }
647}
648
649impl From<&TimeUnit> for protobuf::TimeUnit {
650 fn from(val: &TimeUnit) -> Self {
651 match val {
652 TimeUnit::Second => protobuf::TimeUnit::Second,
653 TimeUnit::Millisecond => protobuf::TimeUnit::Millisecond,
654 TimeUnit::Microsecond => protobuf::TimeUnit::Microsecond,
655 TimeUnit::Nanosecond => protobuf::TimeUnit::Nanosecond,
656 }
657 }
658}
659
660impl From<&IntervalUnit> for protobuf::IntervalUnit {
661 fn from(interval_unit: &IntervalUnit) -> Self {
662 match interval_unit {
663 IntervalUnit::YearMonth => protobuf::IntervalUnit::YearMonth,
664 IntervalUnit::DayTime => protobuf::IntervalUnit::DayTime,
665 IntervalUnit::MonthDayNano => protobuf::IntervalUnit::MonthDayNano,
666 }
667 }
668}
669
670impl From<Constraints> for protobuf::Constraints {
671 fn from(value: Constraints) -> Self {
672 let constraints = value.into_iter().map(|item| item.into()).collect();
673 protobuf::Constraints { constraints }
674 }
675}
676
677impl From<Constraint> for protobuf::Constraint {
678 fn from(value: Constraint) -> Self {
679 let res = match value {
680 Constraint::PrimaryKey(indices) => {
681 let indices = indices.into_iter().map(|item| item as u64).collect();
682 protobuf::constraint::ConstraintMode::PrimaryKey(
683 protobuf::PrimaryKeyConstraint { indices },
684 )
685 }
686 Constraint::Unique(indices) => {
687 let indices = indices.into_iter().map(|item| item as u64).collect();
688 protobuf::constraint::ConstraintMode::PrimaryKey(
689 protobuf::PrimaryKeyConstraint { indices },
690 )
691 }
692 };
693 protobuf::Constraint {
694 constraint_mode: Some(res),
695 }
696 }
697}
698
699impl From<&Precision<usize>> for protobuf::Precision {
700 fn from(s: &Precision<usize>) -> protobuf::Precision {
701 match s {
702 Precision::Exact(val) => protobuf::Precision {
703 precision_info: protobuf::PrecisionInfo::Exact.into(),
704 val: Some(crate::protobuf_common::ScalarValue {
705 value: Some(Value::Uint64Value(*val as u64)),
706 }),
707 },
708 Precision::Inexact(val) => protobuf::Precision {
709 precision_info: protobuf::PrecisionInfo::Inexact.into(),
710 val: Some(crate::protobuf_common::ScalarValue {
711 value: Some(Value::Uint64Value(*val as u64)),
712 }),
713 },
714 Precision::Absent => protobuf::Precision {
715 precision_info: protobuf::PrecisionInfo::Absent.into(),
716 val: Some(crate::protobuf_common::ScalarValue { value: None }),
717 },
718 }
719 }
720}
721
722impl From<&Precision<datafusion_common::ScalarValue>> for protobuf::Precision {
723 fn from(s: &Precision<datafusion_common::ScalarValue>) -> protobuf::Precision {
724 match s {
725 Precision::Exact(val) => protobuf::Precision {
726 precision_info: protobuf::PrecisionInfo::Exact.into(),
727 val: val.try_into().ok(),
728 },
729 Precision::Inexact(val) => protobuf::Precision {
730 precision_info: protobuf::PrecisionInfo::Inexact.into(),
731 val: val.try_into().ok(),
732 },
733 Precision::Absent => protobuf::Precision {
734 precision_info: protobuf::PrecisionInfo::Absent.into(),
735 val: Some(crate::protobuf_common::ScalarValue { value: None }),
736 },
737 }
738 }
739}
740
741impl From<&Statistics> for protobuf::Statistics {
742 fn from(s: &Statistics) -> protobuf::Statistics {
743 let column_stats = s.column_statistics.iter().map(|s| s.into()).collect();
744 protobuf::Statistics {
745 num_rows: Some(protobuf::Precision::from(&s.num_rows)),
746 total_byte_size: Some(protobuf::Precision::from(&s.total_byte_size)),
747 column_stats,
748 }
749 }
750}
751
752impl From<&ColumnStatistics> for protobuf::ColumnStats {
753 fn from(s: &ColumnStatistics) -> protobuf::ColumnStats {
754 protobuf::ColumnStats {
755 min_value: Some(protobuf::Precision::from(&s.min_value)),
756 max_value: Some(protobuf::Precision::from(&s.max_value)),
757 sum_value: Some(protobuf::Precision::from(&s.sum_value)),
758 null_count: Some(protobuf::Precision::from(&s.null_count)),
759 distinct_count: Some(protobuf::Precision::from(&s.distinct_count)),
760 }
761 }
762}
763
764impl From<JoinSide> for protobuf::JoinSide {
765 fn from(t: JoinSide) -> Self {
766 match t {
767 JoinSide::Left => protobuf::JoinSide::LeftSide,
768 JoinSide::Right => protobuf::JoinSide::RightSide,
769 JoinSide::None => protobuf::JoinSide::None,
770 }
771 }
772}
773
774impl From<&CompressionTypeVariant> for protobuf::CompressionTypeVariant {
775 fn from(value: &CompressionTypeVariant) -> Self {
776 match value {
777 CompressionTypeVariant::GZIP => Self::Gzip,
778 CompressionTypeVariant::BZIP2 => Self::Bzip2,
779 CompressionTypeVariant::XZ => Self::Xz,
780 CompressionTypeVariant::ZSTD => Self::Zstd,
781 CompressionTypeVariant::UNCOMPRESSED => Self::Uncompressed,
782 }
783 }
784}
785
786impl TryFrom<&CsvWriterOptions> for protobuf::CsvWriterOptions {
787 type Error = DataFusionError;
788
789 fn try_from(opts: &CsvWriterOptions) -> datafusion_common::Result<Self, Self::Error> {
790 Ok(csv_writer_options_to_proto(
791 &opts.writer_options,
792 &opts.compression,
793 ))
794 }
795}
796
797impl TryFrom<&JsonWriterOptions> for protobuf::JsonWriterOptions {
798 type Error = DataFusionError;
799
800 fn try_from(
801 opts: &JsonWriterOptions,
802 ) -> datafusion_common::Result<Self, Self::Error> {
803 let compression: protobuf::CompressionTypeVariant = opts.compression.into();
804 Ok(protobuf::JsonWriterOptions {
805 compression: compression.into(),
806 })
807 }
808}
809
810impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
811 type Error = DataFusionError;
812
813 fn try_from(value: &ParquetOptions) -> datafusion_common::Result<Self, Self::Error> {
814 Ok(protobuf::ParquetOptions {
815 enable_page_index: value.enable_page_index,
816 pruning: value.pruning,
817 skip_metadata: value.skip_metadata,
818 metadata_size_hint_opt: value.metadata_size_hint.map(|v| protobuf::parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v as u64)),
819 pushdown_filters: value.pushdown_filters,
820 reorder_filters: value.reorder_filters,
821 data_pagesize_limit: value.data_pagesize_limit as u64,
822 write_batch_size: value.write_batch_size as u64,
823 writer_version: value.writer_version.clone(),
824 compression_opt: value.compression.clone().map(protobuf::parquet_options::CompressionOpt::Compression),
825 dictionary_enabled_opt: value.dictionary_enabled.map(protobuf::parquet_options::DictionaryEnabledOpt::DictionaryEnabled),
826 dictionary_page_size_limit: value.dictionary_page_size_limit as u64,
827 statistics_enabled_opt: value.statistics_enabled.clone().map(protobuf::parquet_options::StatisticsEnabledOpt::StatisticsEnabled),
828 max_row_group_size: value.max_row_group_size as u64,
829 created_by: value.created_by.clone(),
830 column_index_truncate_length_opt: value.column_index_truncate_length.map(|v| protobuf::parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v as u64)),
831 statistics_truncate_length_opt: value.statistics_truncate_length.map(|v| protobuf::parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v as u64)),
832 data_page_row_count_limit: value.data_page_row_count_limit as u64,
833 encoding_opt: value.encoding.clone().map(protobuf::parquet_options::EncodingOpt::Encoding),
834 bloom_filter_on_read: value.bloom_filter_on_read,
835 bloom_filter_on_write: value.bloom_filter_on_write,
836 bloom_filter_fpp_opt: value.bloom_filter_fpp.map(protobuf::parquet_options::BloomFilterFppOpt::BloomFilterFpp),
837 bloom_filter_ndv_opt: value.bloom_filter_ndv.map(protobuf::parquet_options::BloomFilterNdvOpt::BloomFilterNdv),
838 allow_single_file_parallelism: value.allow_single_file_parallelism,
839 maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as u64,
840 maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as u64,
841 schema_force_view_types: value.schema_force_view_types,
842 binary_as_string: value.binary_as_string,
843 skip_arrow_metadata: value.skip_arrow_metadata,
844 coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96),
845 })
846 }
847}
848
849impl TryFrom<&ParquetColumnOptions> for protobuf::ParquetColumnOptions {
850 type Error = DataFusionError;
851
852 fn try_from(
853 value: &ParquetColumnOptions,
854 ) -> datafusion_common::Result<Self, Self::Error> {
855 Ok(protobuf::ParquetColumnOptions {
856 compression_opt: value
857 .compression
858 .clone()
859 .map(protobuf::parquet_column_options::CompressionOpt::Compression),
860 dictionary_enabled_opt: value
861 .dictionary_enabled
862 .map(protobuf::parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled),
863 statistics_enabled_opt: value
864 .statistics_enabled
865 .clone()
866 .map(protobuf::parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled),
867 encoding_opt: value
868 .encoding
869 .clone()
870 .map(protobuf::parquet_column_options::EncodingOpt::Encoding),
871 bloom_filter_enabled_opt: value
872 .bloom_filter_enabled
873 .map(protobuf::parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled),
874 bloom_filter_fpp_opt: value
875 .bloom_filter_fpp
876 .map(protobuf::parquet_column_options::BloomFilterFppOpt::BloomFilterFpp),
877 bloom_filter_ndv_opt: value
878 .bloom_filter_ndv
879 .map(protobuf::parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv),
880 })
881 }
882}
883
884impl TryFrom<&TableParquetOptions> for protobuf::TableParquetOptions {
885 type Error = DataFusionError;
886 fn try_from(
887 value: &TableParquetOptions,
888 ) -> datafusion_common::Result<Self, Self::Error> {
889 let column_specific_options = value
890 .column_specific_options
891 .iter()
892 .map(|(k, v)| {
893 Ok(protobuf::ParquetColumnSpecificOptions {
894 column_name: k.into(),
895 options: Some(v.try_into()?),
896 })
897 })
898 .collect::<datafusion_common::Result<Vec<_>>>()?;
899 let key_value_metadata = value
900 .key_value_metadata
901 .iter()
902 .filter_map(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone())))
903 .collect::<HashMap<String, String>>();
904 Ok(protobuf::TableParquetOptions {
905 global: Some((&value.global).try_into()?),
906 column_specific_options,
907 key_value_metadata,
908 })
909 }
910}
911
912impl TryFrom<&CsvOptions> for protobuf::CsvOptions {
913 type Error = DataFusionError; fn try_from(opts: &CsvOptions) -> datafusion_common::Result<Self, Self::Error> {
916 let compression: protobuf::CompressionTypeVariant = opts.compression.into();
917 Ok(protobuf::CsvOptions {
918 has_header: opts.has_header.map_or_else(Vec::new, |h| vec![h as u8]),
919 delimiter: vec![opts.delimiter],
920 quote: vec![opts.quote],
921 terminator: opts.terminator.map_or_else(Vec::new, |e| vec![e]),
922 escape: opts.escape.map_or_else(Vec::new, |e| vec![e]),
923 double_quote: opts.double_quote.map_or_else(Vec::new, |h| vec![h as u8]),
924 newlines_in_values: opts
925 .newlines_in_values
926 .map_or_else(Vec::new, |h| vec![h as u8]),
927 compression: compression.into(),
928 schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
929 date_format: opts.date_format.clone().unwrap_or_default(),
930 datetime_format: opts.datetime_format.clone().unwrap_or_default(),
931 timestamp_format: opts.timestamp_format.clone().unwrap_or_default(),
932 timestamp_tz_format: opts.timestamp_tz_format.clone().unwrap_or_default(),
933 time_format: opts.time_format.clone().unwrap_or_default(),
934 null_value: opts.null_value.clone().unwrap_or_default(),
935 null_regex: opts.null_regex.clone().unwrap_or_default(),
936 comment: opts.comment.map_or_else(Vec::new, |h| vec![h]),
937 })
938 }
939}
940
941impl TryFrom<&JsonOptions> for protobuf::JsonOptions {
942 type Error = DataFusionError;
943
944 fn try_from(opts: &JsonOptions) -> datafusion_common::Result<Self, Self::Error> {
945 let compression: protobuf::CompressionTypeVariant = opts.compression.into();
946 Ok(protobuf::JsonOptions {
947 compression: compression.into(),
948 schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
949 })
950 }
951}
952
953fn create_proto_scalar<I, T: FnOnce(&I) -> protobuf::scalar_value::Value>(
956 v: Option<&I>,
957 null_arrow_type: &DataType,
958 constructor: T,
959) -> Result<protobuf::ScalarValue, Error> {
960 let value = v
961 .map(constructor)
962 .unwrap_or(protobuf::scalar_value::Value::NullValue(
963 null_arrow_type.try_into()?,
964 ));
965
966 Ok(protobuf::ScalarValue { value: Some(value) })
967}
968
969fn encode_scalar_nested_value(
972 arr: ArrayRef,
973 val: &ScalarValue,
974) -> Result<protobuf::ScalarValue, Error> {
975 let batch = RecordBatch::try_from_iter(vec![("field_name", arr)]).map_err(|e| {
976 Error::General(format!(
977 "Error creating temporary batch while encoding ScalarValue::List: {e}"
978 ))
979 })?;
980
981 let gen = IpcDataGenerator {};
982 let mut dict_tracker = DictionaryTracker::new(false);
983 let (encoded_dictionaries, encoded_message) = gen
984 .encoded_batch(&batch, &mut dict_tracker, &Default::default())
985 .map_err(|e| {
986 Error::General(format!("Error encoding ScalarValue::List as IPC: {e}"))
987 })?;
988
989 let schema: protobuf::Schema = batch.schema().try_into()?;
990
991 let scalar_list_value = protobuf::ScalarNestedValue {
992 ipc_message: encoded_message.ipc_message,
993 arrow_data: encoded_message.arrow_data,
994 dictionaries: encoded_dictionaries
995 .into_iter()
996 .map(|data| protobuf::scalar_nested_value::Dictionary {
997 ipc_message: data.ipc_message,
998 arrow_data: data.arrow_data,
999 })
1000 .collect(),
1001 schema: Some(schema),
1002 };
1003
1004 match val {
1005 ScalarValue::List(_) => Ok(protobuf::ScalarValue {
1006 value: Some(protobuf::scalar_value::Value::ListValue(scalar_list_value)),
1007 }),
1008 ScalarValue::LargeList(_) => Ok(protobuf::ScalarValue {
1009 value: Some(protobuf::scalar_value::Value::LargeListValue(
1010 scalar_list_value,
1011 )),
1012 }),
1013 ScalarValue::FixedSizeList(_) => Ok(protobuf::ScalarValue {
1014 value: Some(protobuf::scalar_value::Value::FixedSizeListValue(
1015 scalar_list_value,
1016 )),
1017 }),
1018 ScalarValue::Struct(_) => Ok(protobuf::ScalarValue {
1019 value: Some(protobuf::scalar_value::Value::StructValue(
1020 scalar_list_value,
1021 )),
1022 }),
1023 ScalarValue::Map(_) => Ok(protobuf::ScalarValue {
1024 value: Some(protobuf::scalar_value::Value::MapValue(scalar_list_value)),
1025 }),
1026 _ => unreachable!(),
1027 }
1028}
1029
1030fn convert_arc_fields_to_proto_fields<'a, I>(
1032 fields: I,
1033) -> Result<Vec<protobuf::Field>, Error>
1034where
1035 I: IntoIterator<Item = &'a Arc<Field>>,
1036{
1037 fields
1038 .into_iter()
1039 .map(|field| field.as_ref().try_into())
1040 .collect::<Result<Vec<_>, Error>>()
1041}
1042
1043pub(crate) fn csv_writer_options_to_proto(
1044 csv_options: &WriterBuilder,
1045 compression: &CompressionTypeVariant,
1046) -> protobuf::CsvWriterOptions {
1047 let compression: protobuf::CompressionTypeVariant = compression.into();
1048 protobuf::CsvWriterOptions {
1049 compression: compression.into(),
1050 delimiter: (csv_options.delimiter() as char).to_string(),
1051 has_header: csv_options.header(),
1052 date_format: csv_options.date_format().unwrap_or("").to_owned(),
1053 datetime_format: csv_options.datetime_format().unwrap_or("").to_owned(),
1054 timestamp_format: csv_options.timestamp_format().unwrap_or("").to_owned(),
1055 time_format: csv_options.time_format().unwrap_or("").to_owned(),
1056 null_value: csv_options.null().to_owned(),
1057 quote: (csv_options.quote() as char).to_string(),
1058 escape: (csv_options.escape() as char).to_string(),
1059 double_quote: csv_options.double_quote(),
1060 }
1061}