1use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::protobuf_common as protobuf;
22use crate::protobuf_common::{
23 arrow_type::ArrowTypeEnum, scalar_value::Value, EmptyMessage,
24};
25use arrow::array::{ArrayRef, RecordBatch};
26use arrow::csv::WriterBuilder;
27use arrow::datatypes::{
28 DataType, Field, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, Schema,
29 SchemaRef, TimeUnit, UnionMode,
30};
31use arrow::ipc::writer::{DictionaryTracker, IpcDataGenerator};
32use datafusion_common::{
33 config::{
34 CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
35 TableParquetOptions,
36 },
37 file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
38 parsers::CompressionTypeVariant,
39 plan_datafusion_err,
40 stats::Precision,
41 Column, ColumnStatistics, Constraint, Constraints, DFSchema, DFSchemaRef,
42 DataFusionError, JoinSide, ScalarValue, Statistics,
43};
44
45#[derive(Debug)]
46pub enum Error {
47 General(String),
48
49 InvalidScalarValue(ScalarValue),
50
51 InvalidScalarType(DataType),
52
53 InvalidTimeUnit(TimeUnit),
54
55 NotImplemented(String),
56}
57
58impl std::error::Error for Error {}
59
60impl std::fmt::Display for Error {
61 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
62 match self {
63 Self::General(desc) => write!(f, "General error: {desc}"),
64 Self::InvalidScalarValue(value) => {
65 write!(f, "{value:?} is invalid as a DataFusion scalar value")
66 }
67 Self::InvalidScalarType(data_type) => {
68 write!(f, "{data_type:?} is invalid as a DataFusion scalar type")
69 }
70 Self::InvalidTimeUnit(time_unit) => {
71 write!(
72 f,
73 "Only TimeUnit::Microsecond and TimeUnit::Nanosecond are valid time units, found: {time_unit:?}"
74 )
75 }
76 Self::NotImplemented(s) => {
77 write!(f, "Not implemented: {s}")
78 }
79 }
80 }
81}
82
83impl From<Error> for DataFusionError {
84 fn from(e: Error) -> Self {
85 plan_datafusion_err!("{}", e)
86 }
87}
88
89impl TryFrom<&Field> for protobuf::Field {
90 type Error = Error;
91
92 fn try_from(field: &Field) -> Result<Self, Self::Error> {
93 let arrow_type = field.data_type().try_into()?;
94 Ok(Self {
95 name: field.name().to_owned(),
96 arrow_type: Some(Box::new(arrow_type)),
97 nullable: field.is_nullable(),
98 children: Vec::new(),
99 metadata: field.metadata().clone(),
100 })
101 }
102}
103
104impl TryFrom<&DataType> for protobuf::ArrowType {
105 type Error = Error;
106
107 fn try_from(val: &DataType) -> Result<Self, Self::Error> {
108 let arrow_type_enum: ArrowTypeEnum = val.try_into()?;
109 Ok(Self {
110 arrow_type_enum: Some(arrow_type_enum),
111 })
112 }
113}
114
115impl TryFrom<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
116 type Error = Error;
117
118 fn try_from(val: &DataType) -> Result<Self, Self::Error> {
119 let res = match val {
120 DataType::Null => Self::None(EmptyMessage {}),
121 DataType::Boolean => Self::Bool(EmptyMessage {}),
122 DataType::Int8 => Self::Int8(EmptyMessage {}),
123 DataType::Int16 => Self::Int16(EmptyMessage {}),
124 DataType::Int32 => Self::Int32(EmptyMessage {}),
125 DataType::Int64 => Self::Int64(EmptyMessage {}),
126 DataType::UInt8 => Self::Uint8(EmptyMessage {}),
127 DataType::UInt16 => Self::Uint16(EmptyMessage {}),
128 DataType::UInt32 => Self::Uint32(EmptyMessage {}),
129 DataType::UInt64 => Self::Uint64(EmptyMessage {}),
130 DataType::Float16 => Self::Float16(EmptyMessage {}),
131 DataType::Float32 => Self::Float32(EmptyMessage {}),
132 DataType::Float64 => Self::Float64(EmptyMessage {}),
133 DataType::Timestamp(time_unit, timezone) => {
134 Self::Timestamp(protobuf::Timestamp {
135 time_unit: protobuf::TimeUnit::from(time_unit) as i32,
136 timezone: timezone.as_deref().unwrap_or("").to_string(),
137 })
138 }
139 DataType::Date32 => Self::Date32(EmptyMessage {}),
140 DataType::Date64 => Self::Date64(EmptyMessage {}),
141 DataType::Time32(time_unit) => {
142 Self::Time32(protobuf::TimeUnit::from(time_unit) as i32)
143 }
144 DataType::Time64(time_unit) => {
145 Self::Time64(protobuf::TimeUnit::from(time_unit) as i32)
146 }
147 DataType::Duration(time_unit) => {
148 Self::Duration(protobuf::TimeUnit::from(time_unit) as i32)
149 }
150 DataType::Interval(interval_unit) => {
151 Self::Interval(protobuf::IntervalUnit::from(interval_unit) as i32)
152 }
153 DataType::Binary => Self::Binary(EmptyMessage {}),
154 DataType::BinaryView => Self::BinaryView(EmptyMessage {}),
155 DataType::FixedSizeBinary(size) => Self::FixedSizeBinary(*size),
156 DataType::LargeBinary => Self::LargeBinary(EmptyMessage {}),
157 DataType::Utf8 => Self::Utf8(EmptyMessage {}),
158 DataType::Utf8View => Self::Utf8View(EmptyMessage {}),
159 DataType::LargeUtf8 => Self::LargeUtf8(EmptyMessage {}),
160 DataType::List(item_type) => Self::List(Box::new(protobuf::List {
161 field_type: Some(Box::new(item_type.as_ref().try_into()?)),
162 })),
163 DataType::FixedSizeList(item_type, size) => {
164 Self::FixedSizeList(Box::new(protobuf::FixedSizeList {
165 field_type: Some(Box::new(item_type.as_ref().try_into()?)),
166 list_size: *size,
167 }))
168 }
169 DataType::LargeList(item_type) => Self::LargeList(Box::new(protobuf::List {
170 field_type: Some(Box::new(item_type.as_ref().try_into()?)),
171 })),
172 DataType::Struct(struct_fields) => Self::Struct(protobuf::Struct {
173 sub_field_types: convert_arc_fields_to_proto_fields(struct_fields)?,
174 }),
175 DataType::Union(fields, union_mode) => {
176 let union_mode = match union_mode {
177 UnionMode::Sparse => protobuf::UnionMode::Sparse,
178 UnionMode::Dense => protobuf::UnionMode::Dense,
179 };
180 Self::Union(protobuf::Union {
181 union_types: convert_arc_fields_to_proto_fields(fields.iter().map(|(_, item)|item))?,
182 union_mode: union_mode.into(),
183 type_ids: fields.iter().map(|(x, _)| x as i32).collect(),
184 })
185 }
186 DataType::Dictionary(key_type, value_type) => {
187 Self::Dictionary(Box::new(protobuf::Dictionary {
188 key: Some(Box::new(key_type.as_ref().try_into()?)),
189 value: Some(Box::new(value_type.as_ref().try_into()?)),
190 }))
191 }
192 DataType::Decimal128(precision, scale) => Self::Decimal(protobuf::Decimal {
193 precision: *precision as u32,
194 scale: *scale as i32,
195 }),
196 DataType::Decimal256(precision, scale) => Self::Decimal256(protobuf::Decimal256Type {
197 precision: *precision as u32,
198 scale: *scale as i32,
199 }),
200 DataType::Map(field, sorted) => {
201 Self::Map(Box::new(
202 protobuf::Map {
203 field_type: Some(Box::new(field.as_ref().try_into()?)),
204 keys_sorted: *sorted,
205 }
206 ))
207 }
208 DataType::RunEndEncoded(_, _) => {
209 return Err(Error::General(
210 "Proto serialization error: The RunEndEncoded data type is not yet supported".to_owned()
211 ))
212 }
213 DataType::ListView(_) | DataType::LargeListView(_) => {
214 return Err(Error::General(format!("Proto serialization error: {val} not yet supported")))
215 }
216 };
217
218 Ok(res)
219 }
220}
221
222impl From<Column> for protobuf::Column {
223 fn from(c: Column) -> Self {
224 Self {
225 relation: c.relation.map(|relation| protobuf::ColumnRelation {
226 relation: relation.to_string(),
227 }),
228 name: c.name,
229 }
230 }
231}
232
233impl From<&Column> for protobuf::Column {
234 fn from(c: &Column) -> Self {
235 c.clone().into()
236 }
237}
238
239impl TryFrom<&Schema> for protobuf::Schema {
240 type Error = Error;
241
242 fn try_from(schema: &Schema) -> Result<Self, Self::Error> {
243 Ok(Self {
244 columns: convert_arc_fields_to_proto_fields(schema.fields())?,
245 metadata: schema.metadata.clone(),
246 })
247 }
248}
249
250impl TryFrom<SchemaRef> for protobuf::Schema {
251 type Error = Error;
252
253 fn try_from(schema: SchemaRef) -> Result<Self, Self::Error> {
254 Ok(Self {
255 columns: convert_arc_fields_to_proto_fields(schema.fields())?,
256 metadata: schema.metadata.clone(),
257 })
258 }
259}
260
261impl TryFrom<&DFSchema> for protobuf::DfSchema {
262 type Error = Error;
263
264 fn try_from(s: &DFSchema) -> Result<Self, Self::Error> {
265 let columns = s
266 .iter()
267 .map(|(qualifier, field)| {
268 Ok(protobuf::DfField {
269 field: Some(field.as_ref().try_into()?),
270 qualifier: qualifier.map(|r| protobuf::ColumnRelation {
271 relation: r.to_string(),
272 }),
273 })
274 })
275 .collect::<Result<Vec<_>, Error>>()?;
276 Ok(Self {
277 columns,
278 metadata: s.metadata().clone(),
279 })
280 }
281}
282
283impl TryFrom<&DFSchemaRef> for protobuf::DfSchema {
284 type Error = Error;
285
286 fn try_from(s: &DFSchemaRef) -> Result<Self, Self::Error> {
287 s.as_ref().try_into()
288 }
289}
290
291impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
292 type Error = Error;
293
294 fn try_from(val: &ScalarValue) -> Result<Self, Self::Error> {
295 let data_type = val.data_type();
296 match val {
297 ScalarValue::Boolean(val) => {
298 create_proto_scalar(val.as_ref(), &data_type, |s| Value::BoolValue(*s))
299 }
300 ScalarValue::Float16(val) => {
301 create_proto_scalar(val.as_ref(), &data_type, |s| {
302 Value::Float32Value((*s).into())
303 })
304 }
305 ScalarValue::Float32(val) => {
306 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Float32Value(*s))
307 }
308 ScalarValue::Float64(val) => {
309 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Float64Value(*s))
310 }
311 ScalarValue::Int8(val) => {
312 create_proto_scalar(val.as_ref(), &data_type, |s| {
313 Value::Int8Value(*s as i32)
314 })
315 }
316 ScalarValue::Int16(val) => {
317 create_proto_scalar(val.as_ref(), &data_type, |s| {
318 Value::Int16Value(*s as i32)
319 })
320 }
321 ScalarValue::Int32(val) => {
322 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Int32Value(*s))
323 }
324 ScalarValue::Int64(val) => {
325 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Int64Value(*s))
326 }
327 ScalarValue::UInt8(val) => {
328 create_proto_scalar(val.as_ref(), &data_type, |s| {
329 Value::Uint8Value(*s as u32)
330 })
331 }
332 ScalarValue::UInt16(val) => {
333 create_proto_scalar(val.as_ref(), &data_type, |s| {
334 Value::Uint16Value(*s as u32)
335 })
336 }
337 ScalarValue::UInt32(val) => {
338 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Uint32Value(*s))
339 }
340 ScalarValue::UInt64(val) => {
341 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Uint64Value(*s))
342 }
343 ScalarValue::Utf8(val) => {
344 create_proto_scalar(val.as_ref(), &data_type, |s| {
345 Value::Utf8Value(s.to_owned())
346 })
347 }
348 ScalarValue::LargeUtf8(val) => {
349 create_proto_scalar(val.as_ref(), &data_type, |s| {
350 Value::LargeUtf8Value(s.to_owned())
351 })
352 }
353 ScalarValue::Utf8View(val) => {
354 create_proto_scalar(val.as_ref(), &data_type, |s| {
355 Value::Utf8ViewValue(s.to_owned())
356 })
357 }
358 ScalarValue::List(arr) => {
359 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
360 }
361 ScalarValue::LargeList(arr) => {
362 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
363 }
364 ScalarValue::FixedSizeList(arr) => {
365 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
366 }
367 ScalarValue::Struct(arr) => {
368 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
369 }
370 ScalarValue::Map(arr) => {
371 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
372 }
373 ScalarValue::Date32(val) => {
374 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Date32Value(*s))
375 }
376 ScalarValue::TimestampMicrosecond(val, tz) => {
377 create_proto_scalar(val.as_ref(), &data_type, |s| {
378 Value::TimestampValue(protobuf::ScalarTimestampValue {
379 timezone: tz.as_deref().unwrap_or("").to_string(),
380 value: Some(
381 protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(
382 *s,
383 ),
384 ),
385 })
386 })
387 }
388 ScalarValue::TimestampNanosecond(val, tz) => {
389 create_proto_scalar(val.as_ref(), &data_type, |s| {
390 Value::TimestampValue(protobuf::ScalarTimestampValue {
391 timezone: tz.as_deref().unwrap_or("").to_string(),
392 value: Some(
393 protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(
394 *s,
395 ),
396 ),
397 })
398 })
399 }
400 ScalarValue::Decimal128(val, p, s) => match *val {
401 Some(v) => {
402 let array = v.to_be_bytes();
403 let vec_val: Vec<u8> = array.to_vec();
404 Ok(protobuf::ScalarValue {
405 value: Some(Value::Decimal128Value(protobuf::Decimal128 {
406 value: vec_val,
407 p: *p as i64,
408 s: *s as i64,
409 })),
410 })
411 }
412 None => Ok(protobuf::ScalarValue {
413 value: Some(protobuf::scalar_value::Value::NullValue(
414 (&data_type).try_into()?,
415 )),
416 }),
417 },
418 ScalarValue::Decimal256(val, p, s) => match *val {
419 Some(v) => {
420 let array = v.to_be_bytes();
421 let vec_val: Vec<u8> = array.to_vec();
422 Ok(protobuf::ScalarValue {
423 value: Some(Value::Decimal256Value(protobuf::Decimal256 {
424 value: vec_val,
425 p: *p as i64,
426 s: *s as i64,
427 })),
428 })
429 }
430 None => Ok(protobuf::ScalarValue {
431 value: Some(protobuf::scalar_value::Value::NullValue(
432 (&data_type).try_into()?,
433 )),
434 }),
435 },
436 ScalarValue::Date64(val) => {
437 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Date64Value(*s))
438 }
439 ScalarValue::TimestampSecond(val, tz) => {
440 create_proto_scalar(val.as_ref(), &data_type, |s| {
441 Value::TimestampValue(protobuf::ScalarTimestampValue {
442 timezone: tz.as_deref().unwrap_or("").to_string(),
443 value: Some(
444 protobuf::scalar_timestamp_value::Value::TimeSecondValue(*s),
445 ),
446 })
447 })
448 }
449 ScalarValue::TimestampMillisecond(val, tz) => {
450 create_proto_scalar(val.as_ref(), &data_type, |s| {
451 Value::TimestampValue(protobuf::ScalarTimestampValue {
452 timezone: tz.as_deref().unwrap_or("").to_string(),
453 value: Some(
454 protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(
455 *s,
456 ),
457 ),
458 })
459 })
460 }
461 ScalarValue::IntervalYearMonth(val) => {
462 create_proto_scalar(val.as_ref(), &data_type, |s| {
463 Value::IntervalYearmonthValue(*s)
464 })
465 }
466 ScalarValue::Null => Ok(protobuf::ScalarValue {
467 value: Some(Value::NullValue((&data_type).try_into()?)),
468 }),
469
470 ScalarValue::Binary(val) => {
471 create_proto_scalar(val.as_ref(), &data_type, |s| {
472 Value::BinaryValue(s.to_owned())
473 })
474 }
475 ScalarValue::BinaryView(val) => {
476 create_proto_scalar(val.as_ref(), &data_type, |s| {
477 Value::BinaryViewValue(s.to_owned())
478 })
479 }
480 ScalarValue::LargeBinary(val) => {
481 create_proto_scalar(val.as_ref(), &data_type, |s| {
482 Value::LargeBinaryValue(s.to_owned())
483 })
484 }
485 ScalarValue::FixedSizeBinary(length, val) => {
486 create_proto_scalar(val.as_ref(), &data_type, |s| {
487 Value::FixedSizeBinaryValue(protobuf::ScalarFixedSizeBinary {
488 values: s.to_owned(),
489 length: *length,
490 })
491 })
492 }
493
494 ScalarValue::Time32Second(v) => {
495 create_proto_scalar(v.as_ref(), &data_type, |v| {
496 Value::Time32Value(protobuf::ScalarTime32Value {
497 value: Some(
498 protobuf::scalar_time32_value::Value::Time32SecondValue(*v),
499 ),
500 })
501 })
502 }
503
504 ScalarValue::Time32Millisecond(v) => {
505 create_proto_scalar(v.as_ref(), &data_type, |v| {
506 Value::Time32Value(protobuf::ScalarTime32Value {
507 value: Some(
508 protobuf::scalar_time32_value::Value::Time32MillisecondValue(
509 *v,
510 ),
511 ),
512 })
513 })
514 }
515
516 ScalarValue::Time64Microsecond(v) => {
517 create_proto_scalar(v.as_ref(), &data_type, |v| {
518 Value::Time64Value(protobuf::ScalarTime64Value {
519 value: Some(
520 protobuf::scalar_time64_value::Value::Time64MicrosecondValue(
521 *v,
522 ),
523 ),
524 })
525 })
526 }
527
528 ScalarValue::Time64Nanosecond(v) => {
529 create_proto_scalar(v.as_ref(), &data_type, |v| {
530 Value::Time64Value(protobuf::ScalarTime64Value {
531 value: Some(
532 protobuf::scalar_time64_value::Value::Time64NanosecondValue(
533 *v,
534 ),
535 ),
536 })
537 })
538 }
539
540 ScalarValue::IntervalDayTime(val) => {
541 let value = if let Some(v) = val {
542 let (days, milliseconds) = IntervalDayTimeType::to_parts(*v);
543 Value::IntervalDaytimeValue(protobuf::IntervalDayTimeValue {
544 days,
545 milliseconds,
546 })
547 } else {
548 Value::NullValue((&data_type).try_into()?)
549 };
550
551 Ok(protobuf::ScalarValue { value: Some(value) })
552 }
553
554 ScalarValue::IntervalMonthDayNano(v) => {
555 let value = if let Some(v) = v {
556 let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v);
557 Value::IntervalMonthDayNano(protobuf::IntervalMonthDayNanoValue {
558 months,
559 days,
560 nanos,
561 })
562 } else {
563 Value::NullValue((&data_type).try_into()?)
564 };
565
566 Ok(protobuf::ScalarValue { value: Some(value) })
567 }
568
569 ScalarValue::DurationSecond(v) => {
570 let value = match v {
571 Some(v) => Value::DurationSecondValue(*v),
572 None => Value::NullValue((&data_type).try_into()?),
573 };
574 Ok(protobuf::ScalarValue { value: Some(value) })
575 }
576 ScalarValue::DurationMillisecond(v) => {
577 let value = match v {
578 Some(v) => Value::DurationMillisecondValue(*v),
579 None => Value::NullValue((&data_type).try_into()?),
580 };
581 Ok(protobuf::ScalarValue { value: Some(value) })
582 }
583 ScalarValue::DurationMicrosecond(v) => {
584 let value = match v {
585 Some(v) => Value::DurationMicrosecondValue(*v),
586 None => Value::NullValue((&data_type).try_into()?),
587 };
588 Ok(protobuf::ScalarValue { value: Some(value) })
589 }
590 ScalarValue::DurationNanosecond(v) => {
591 let value = match v {
592 Some(v) => Value::DurationNanosecondValue(*v),
593 None => Value::NullValue((&data_type).try_into()?),
594 };
595 Ok(protobuf::ScalarValue { value: Some(value) })
596 }
597
598 ScalarValue::Union(val, df_fields, mode) => {
599 let mut fields =
600 Vec::<protobuf::UnionField>::with_capacity(df_fields.len());
601 for (id, field) in df_fields.iter() {
602 let field_id = id as i32;
603 let field = Some(field.as_ref().try_into()?);
604 let field = protobuf::UnionField { field_id, field };
605 fields.push(field);
606 }
607 let mode = match mode {
608 UnionMode::Sparse => 0,
609 UnionMode::Dense => 1,
610 };
611 let value = match val {
612 None => None,
613 Some((_id, v)) => Some(Box::new(v.as_ref().try_into()?)),
614 };
615 let val = protobuf::UnionValue {
616 value_id: val.as_ref().map(|(id, _v)| *id as i32).unwrap_or(0),
617 value,
618 fields,
619 mode,
620 };
621 let val = Value::UnionValue(Box::new(val));
622 let val = protobuf::ScalarValue { value: Some(val) };
623 Ok(val)
624 }
625
626 ScalarValue::Dictionary(index_type, val) => {
627 let value: protobuf::ScalarValue = val.as_ref().try_into()?;
628 Ok(protobuf::ScalarValue {
629 value: Some(Value::DictionaryValue(Box::new(
630 protobuf::ScalarDictionaryValue {
631 index_type: Some(index_type.as_ref().try_into()?),
632 value: Some(Box::new(value)),
633 },
634 ))),
635 })
636 }
637 }
638 }
639}
640
641impl From<&TimeUnit> for protobuf::TimeUnit {
642 fn from(val: &TimeUnit) -> Self {
643 match val {
644 TimeUnit::Second => protobuf::TimeUnit::Second,
645 TimeUnit::Millisecond => protobuf::TimeUnit::Millisecond,
646 TimeUnit::Microsecond => protobuf::TimeUnit::Microsecond,
647 TimeUnit::Nanosecond => protobuf::TimeUnit::Nanosecond,
648 }
649 }
650}
651
652impl From<&IntervalUnit> for protobuf::IntervalUnit {
653 fn from(interval_unit: &IntervalUnit) -> Self {
654 match interval_unit {
655 IntervalUnit::YearMonth => protobuf::IntervalUnit::YearMonth,
656 IntervalUnit::DayTime => protobuf::IntervalUnit::DayTime,
657 IntervalUnit::MonthDayNano => protobuf::IntervalUnit::MonthDayNano,
658 }
659 }
660}
661
662impl From<Constraints> for protobuf::Constraints {
663 fn from(value: Constraints) -> Self {
664 let constraints = value.into_iter().map(|item| item.into()).collect();
665 protobuf::Constraints { constraints }
666 }
667}
668
669impl From<Constraint> for protobuf::Constraint {
670 fn from(value: Constraint) -> Self {
671 let res = match value {
672 Constraint::PrimaryKey(indices) => {
673 let indices = indices.into_iter().map(|item| item as u64).collect();
674 protobuf::constraint::ConstraintMode::PrimaryKey(
675 protobuf::PrimaryKeyConstraint { indices },
676 )
677 }
678 Constraint::Unique(indices) => {
679 let indices = indices.into_iter().map(|item| item as u64).collect();
680 protobuf::constraint::ConstraintMode::PrimaryKey(
681 protobuf::PrimaryKeyConstraint { indices },
682 )
683 }
684 };
685 protobuf::Constraint {
686 constraint_mode: Some(res),
687 }
688 }
689}
690
691impl From<&Precision<usize>> for protobuf::Precision {
692 fn from(s: &Precision<usize>) -> protobuf::Precision {
693 match s {
694 Precision::Exact(val) => protobuf::Precision {
695 precision_info: protobuf::PrecisionInfo::Exact.into(),
696 val: Some(crate::protobuf_common::ScalarValue {
697 value: Some(Value::Uint64Value(*val as u64)),
698 }),
699 },
700 Precision::Inexact(val) => protobuf::Precision {
701 precision_info: protobuf::PrecisionInfo::Inexact.into(),
702 val: Some(crate::protobuf_common::ScalarValue {
703 value: Some(Value::Uint64Value(*val as u64)),
704 }),
705 },
706 Precision::Absent => protobuf::Precision {
707 precision_info: protobuf::PrecisionInfo::Absent.into(),
708 val: Some(crate::protobuf_common::ScalarValue { value: None }),
709 },
710 }
711 }
712}
713
714impl From<&Precision<datafusion_common::ScalarValue>> for protobuf::Precision {
715 fn from(s: &Precision<datafusion_common::ScalarValue>) -> protobuf::Precision {
716 match s {
717 Precision::Exact(val) => protobuf::Precision {
718 precision_info: protobuf::PrecisionInfo::Exact.into(),
719 val: val.try_into().ok(),
720 },
721 Precision::Inexact(val) => protobuf::Precision {
722 precision_info: protobuf::PrecisionInfo::Inexact.into(),
723 val: val.try_into().ok(),
724 },
725 Precision::Absent => protobuf::Precision {
726 precision_info: protobuf::PrecisionInfo::Absent.into(),
727 val: Some(crate::protobuf_common::ScalarValue { value: None }),
728 },
729 }
730 }
731}
732
733impl From<&Statistics> for protobuf::Statistics {
734 fn from(s: &Statistics) -> protobuf::Statistics {
735 let column_stats = s.column_statistics.iter().map(|s| s.into()).collect();
736 protobuf::Statistics {
737 num_rows: Some(protobuf::Precision::from(&s.num_rows)),
738 total_byte_size: Some(protobuf::Precision::from(&s.total_byte_size)),
739 column_stats,
740 }
741 }
742}
743
744impl From<&ColumnStatistics> for protobuf::ColumnStats {
745 fn from(s: &ColumnStatistics) -> protobuf::ColumnStats {
746 protobuf::ColumnStats {
747 min_value: Some(protobuf::Precision::from(&s.min_value)),
748 max_value: Some(protobuf::Precision::from(&s.max_value)),
749 sum_value: Some(protobuf::Precision::from(&s.sum_value)),
750 null_count: Some(protobuf::Precision::from(&s.null_count)),
751 distinct_count: Some(protobuf::Precision::from(&s.distinct_count)),
752 }
753 }
754}
755
756impl From<JoinSide> for protobuf::JoinSide {
757 fn from(t: JoinSide) -> Self {
758 match t {
759 JoinSide::Left => protobuf::JoinSide::LeftSide,
760 JoinSide::Right => protobuf::JoinSide::RightSide,
761 JoinSide::None => protobuf::JoinSide::None,
762 }
763 }
764}
765
766impl From<&CompressionTypeVariant> for protobuf::CompressionTypeVariant {
767 fn from(value: &CompressionTypeVariant) -> Self {
768 match value {
769 CompressionTypeVariant::GZIP => Self::Gzip,
770 CompressionTypeVariant::BZIP2 => Self::Bzip2,
771 CompressionTypeVariant::XZ => Self::Xz,
772 CompressionTypeVariant::ZSTD => Self::Zstd,
773 CompressionTypeVariant::UNCOMPRESSED => Self::Uncompressed,
774 }
775 }
776}
777
778impl TryFrom<&CsvWriterOptions> for protobuf::CsvWriterOptions {
779 type Error = DataFusionError;
780
781 fn try_from(opts: &CsvWriterOptions) -> datafusion_common::Result<Self, Self::Error> {
782 Ok(csv_writer_options_to_proto(
783 &opts.writer_options,
784 &opts.compression,
785 ))
786 }
787}
788
789impl TryFrom<&JsonWriterOptions> for protobuf::JsonWriterOptions {
790 type Error = DataFusionError;
791
792 fn try_from(
793 opts: &JsonWriterOptions,
794 ) -> datafusion_common::Result<Self, Self::Error> {
795 let compression: protobuf::CompressionTypeVariant = opts.compression.into();
796 Ok(protobuf::JsonWriterOptions {
797 compression: compression.into(),
798 })
799 }
800}
801
802impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
803 type Error = DataFusionError;
804
805 fn try_from(value: &ParquetOptions) -> datafusion_common::Result<Self, Self::Error> {
806 Ok(protobuf::ParquetOptions {
807 enable_page_index: value.enable_page_index,
808 pruning: value.pruning,
809 skip_metadata: value.skip_metadata,
810 metadata_size_hint_opt: value.metadata_size_hint.map(|v| protobuf::parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v as u64)),
811 pushdown_filters: value.pushdown_filters,
812 reorder_filters: value.reorder_filters,
813 data_pagesize_limit: value.data_pagesize_limit as u64,
814 write_batch_size: value.write_batch_size as u64,
815 writer_version: value.writer_version.clone(),
816 compression_opt: value.compression.clone().map(protobuf::parquet_options::CompressionOpt::Compression),
817 dictionary_enabled_opt: value.dictionary_enabled.map(protobuf::parquet_options::DictionaryEnabledOpt::DictionaryEnabled),
818 dictionary_page_size_limit: value.dictionary_page_size_limit as u64,
819 statistics_enabled_opt: value.statistics_enabled.clone().map(protobuf::parquet_options::StatisticsEnabledOpt::StatisticsEnabled),
820 #[allow(deprecated)]
821 max_statistics_size_opt: value.max_statistics_size.map(|v| protobuf::parquet_options::MaxStatisticsSizeOpt::MaxStatisticsSize(v as u64)),
822 max_row_group_size: value.max_row_group_size as u64,
823 created_by: value.created_by.clone(),
824 column_index_truncate_length_opt: value.column_index_truncate_length.map(|v| protobuf::parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v as u64)),
825 statistics_truncate_length_opt: value.statistics_truncate_length.map(|v| protobuf::parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v as u64)),
826 data_page_row_count_limit: value.data_page_row_count_limit as u64,
827 encoding_opt: value.encoding.clone().map(protobuf::parquet_options::EncodingOpt::Encoding),
828 bloom_filter_on_read: value.bloom_filter_on_read,
829 bloom_filter_on_write: value.bloom_filter_on_write,
830 bloom_filter_fpp_opt: value.bloom_filter_fpp.map(protobuf::parquet_options::BloomFilterFppOpt::BloomFilterFpp),
831 bloom_filter_ndv_opt: value.bloom_filter_ndv.map(protobuf::parquet_options::BloomFilterNdvOpt::BloomFilterNdv),
832 allow_single_file_parallelism: value.allow_single_file_parallelism,
833 maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as u64,
834 maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as u64,
835 schema_force_view_types: value.schema_force_view_types,
836 binary_as_string: value.binary_as_string,
837 skip_arrow_metadata: value.skip_arrow_metadata,
838 coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96),
839 })
840 }
841}
842
843impl TryFrom<&ParquetColumnOptions> for protobuf::ParquetColumnOptions {
844 type Error = DataFusionError;
845
846 fn try_from(
847 value: &ParquetColumnOptions,
848 ) -> datafusion_common::Result<Self, Self::Error> {
849 Ok(protobuf::ParquetColumnOptions {
850 compression_opt: value
851 .compression
852 .clone()
853 .map(protobuf::parquet_column_options::CompressionOpt::Compression),
854 dictionary_enabled_opt: value
855 .dictionary_enabled
856 .map(protobuf::parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled),
857 statistics_enabled_opt: value
858 .statistics_enabled
859 .clone()
860 .map(protobuf::parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled),
861 #[allow(deprecated)]
862 max_statistics_size_opt: value.max_statistics_size.map(|v| {
863 protobuf::parquet_column_options::MaxStatisticsSizeOpt::MaxStatisticsSize(
864 v as u32,
865 )
866 }),
867 encoding_opt: value
868 .encoding
869 .clone()
870 .map(protobuf::parquet_column_options::EncodingOpt::Encoding),
871 bloom_filter_enabled_opt: value
872 .bloom_filter_enabled
873 .map(protobuf::parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled),
874 bloom_filter_fpp_opt: value
875 .bloom_filter_fpp
876 .map(protobuf::parquet_column_options::BloomFilterFppOpt::BloomFilterFpp),
877 bloom_filter_ndv_opt: value
878 .bloom_filter_ndv
879 .map(protobuf::parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv),
880 })
881 }
882}
883
884impl TryFrom<&TableParquetOptions> for protobuf::TableParquetOptions {
885 type Error = DataFusionError;
886 fn try_from(
887 value: &TableParquetOptions,
888 ) -> datafusion_common::Result<Self, Self::Error> {
889 let column_specific_options = value
890 .column_specific_options
891 .iter()
892 .map(|(k, v)| {
893 Ok(protobuf::ParquetColumnSpecificOptions {
894 column_name: k.into(),
895 options: Some(v.try_into()?),
896 })
897 })
898 .collect::<datafusion_common::Result<Vec<_>>>()?;
899 let key_value_metadata = value
900 .key_value_metadata
901 .iter()
902 .filter_map(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone())))
903 .collect::<HashMap<String, String>>();
904 Ok(protobuf::TableParquetOptions {
905 global: Some((&value.global).try_into()?),
906 column_specific_options,
907 key_value_metadata,
908 })
909 }
910}
911
912impl TryFrom<&CsvOptions> for protobuf::CsvOptions {
913 type Error = DataFusionError; fn try_from(opts: &CsvOptions) -> datafusion_common::Result<Self, Self::Error> {
916 let compression: protobuf::CompressionTypeVariant = opts.compression.into();
917 Ok(protobuf::CsvOptions {
918 has_header: opts.has_header.map_or_else(Vec::new, |h| vec![h as u8]),
919 delimiter: vec![opts.delimiter],
920 quote: vec![opts.quote],
921 terminator: opts.terminator.map_or_else(Vec::new, |e| vec![e]),
922 escape: opts.escape.map_or_else(Vec::new, |e| vec![e]),
923 double_quote: opts.double_quote.map_or_else(Vec::new, |h| vec![h as u8]),
924 newlines_in_values: opts
925 .newlines_in_values
926 .map_or_else(Vec::new, |h| vec![h as u8]),
927 compression: compression.into(),
928 schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
929 date_format: opts.date_format.clone().unwrap_or_default(),
930 datetime_format: opts.datetime_format.clone().unwrap_or_default(),
931 timestamp_format: opts.timestamp_format.clone().unwrap_or_default(),
932 timestamp_tz_format: opts.timestamp_tz_format.clone().unwrap_or_default(),
933 time_format: opts.time_format.clone().unwrap_or_default(),
934 null_value: opts.null_value.clone().unwrap_or_default(),
935 null_regex: opts.null_regex.clone().unwrap_or_default(),
936 comment: opts.comment.map_or_else(Vec::new, |h| vec![h]),
937 })
938 }
939}
940
941impl TryFrom<&JsonOptions> for protobuf::JsonOptions {
942 type Error = DataFusionError;
943
944 fn try_from(opts: &JsonOptions) -> datafusion_common::Result<Self, Self::Error> {
945 let compression: protobuf::CompressionTypeVariant = opts.compression.into();
946 Ok(protobuf::JsonOptions {
947 compression: compression.into(),
948 schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
949 })
950 }
951}
952
953fn create_proto_scalar<I, T: FnOnce(&I) -> protobuf::scalar_value::Value>(
956 v: Option<&I>,
957 null_arrow_type: &DataType,
958 constructor: T,
959) -> Result<protobuf::ScalarValue, Error> {
960 let value = v
961 .map(constructor)
962 .unwrap_or(protobuf::scalar_value::Value::NullValue(
963 null_arrow_type.try_into()?,
964 ));
965
966 Ok(protobuf::ScalarValue { value: Some(value) })
967}
968
969fn encode_scalar_nested_value(
972 arr: ArrayRef,
973 val: &ScalarValue,
974) -> Result<protobuf::ScalarValue, Error> {
975 let batch = RecordBatch::try_from_iter(vec![("field_name", arr)]).map_err(|e| {
976 Error::General(format!(
977 "Error creating temporary batch while encoding ScalarValue::List: {e}"
978 ))
979 })?;
980
981 let gen = IpcDataGenerator {};
982 let mut dict_tracker = DictionaryTracker::new(false);
983 let (encoded_dictionaries, encoded_message) = gen
984 .encoded_batch(&batch, &mut dict_tracker, &Default::default())
985 .map_err(|e| {
986 Error::General(format!("Error encoding ScalarValue::List as IPC: {e}"))
987 })?;
988
989 let schema: protobuf::Schema = batch.schema().try_into()?;
990
991 let scalar_list_value = protobuf::ScalarNestedValue {
992 ipc_message: encoded_message.ipc_message,
993 arrow_data: encoded_message.arrow_data,
994 dictionaries: encoded_dictionaries
995 .into_iter()
996 .map(|data| protobuf::scalar_nested_value::Dictionary {
997 ipc_message: data.ipc_message,
998 arrow_data: data.arrow_data,
999 })
1000 .collect(),
1001 schema: Some(schema),
1002 };
1003
1004 match val {
1005 ScalarValue::List(_) => Ok(protobuf::ScalarValue {
1006 value: Some(protobuf::scalar_value::Value::ListValue(scalar_list_value)),
1007 }),
1008 ScalarValue::LargeList(_) => Ok(protobuf::ScalarValue {
1009 value: Some(protobuf::scalar_value::Value::LargeListValue(
1010 scalar_list_value,
1011 )),
1012 }),
1013 ScalarValue::FixedSizeList(_) => Ok(protobuf::ScalarValue {
1014 value: Some(protobuf::scalar_value::Value::FixedSizeListValue(
1015 scalar_list_value,
1016 )),
1017 }),
1018 ScalarValue::Struct(_) => Ok(protobuf::ScalarValue {
1019 value: Some(protobuf::scalar_value::Value::StructValue(
1020 scalar_list_value,
1021 )),
1022 }),
1023 ScalarValue::Map(_) => Ok(protobuf::ScalarValue {
1024 value: Some(protobuf::scalar_value::Value::MapValue(scalar_list_value)),
1025 }),
1026 _ => unreachable!(),
1027 }
1028}
1029
1030fn convert_arc_fields_to_proto_fields<'a, I>(
1032 fields: I,
1033) -> Result<Vec<protobuf::Field>, Error>
1034where
1035 I: IntoIterator<Item = &'a Arc<Field>>,
1036{
1037 fields
1038 .into_iter()
1039 .map(|field| field.as_ref().try_into())
1040 .collect::<Result<Vec<_>, Error>>()
1041}
1042
1043pub(crate) fn csv_writer_options_to_proto(
1044 csv_options: &WriterBuilder,
1045 compression: &CompressionTypeVariant,
1046) -> protobuf::CsvWriterOptions {
1047 let compression: protobuf::CompressionTypeVariant = compression.into();
1048 protobuf::CsvWriterOptions {
1049 compression: compression.into(),
1050 delimiter: (csv_options.delimiter() as char).to_string(),
1051 has_header: csv_options.header(),
1052 date_format: csv_options.date_format().unwrap_or("").to_owned(),
1053 datetime_format: csv_options.datetime_format().unwrap_or("").to_owned(),
1054 timestamp_format: csv_options.timestamp_format().unwrap_or("").to_owned(),
1055 time_format: csv_options.time_format().unwrap_or("").to_owned(),
1056 null_value: csv_options.null().to_owned(),
1057 quote: (csv_options.quote() as char).to_string(),
1058 escape: (csv_options.escape() as char).to_string(),
1059 double_quote: csv_options.double_quote(),
1060 }
1061}