1use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::protobuf_common as protobuf;
22use crate::protobuf_common::{
23 EmptyMessage, arrow_type::ArrowTypeEnum, scalar_value::Value,
24};
25use arrow::array::{ArrayRef, RecordBatch};
26use arrow::csv::{QuoteStyle, WriterBuilder};
27use arrow::datatypes::{
28 DataType, Field, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, Schema,
29 SchemaRef, TimeUnit, UnionMode,
30};
31use arrow::ipc::writer::{
32 CompressionContext, DictionaryTracker, IpcDataGenerator, IpcWriteOptions,
33};
34use datafusion_common::parsers::CsvQuoteStyle;
35use datafusion_common::{
36 Column, ColumnStatistics, Constraint, Constraints, DFSchema, DFSchemaRef,
37 DataFusionError, JoinSide, ScalarValue, Statistics,
38 config::{
39 CsvOptions, JsonOptions, ParquetCdcOptions, ParquetColumnOptions, ParquetOptions,
40 TableParquetOptions,
41 },
42 file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
43 parsers::CompressionTypeVariant,
44 plan_datafusion_err,
45 stats::Precision,
46};
47
48#[derive(Debug)]
49pub enum Error {
50 General(String),
51
52 InvalidScalarValue(ScalarValue),
53
54 InvalidScalarType(DataType),
55
56 InvalidTimeUnit(TimeUnit),
57
58 NotImplemented(String),
59}
60
61impl std::error::Error for Error {}
62
63impl std::fmt::Display for Error {
64 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
65 match self {
66 Self::General(desc) => write!(f, "General error: {desc}"),
67 Self::InvalidScalarValue(value) => {
68 write!(f, "{value:?} is invalid as a DataFusion scalar value")
69 }
70 Self::InvalidScalarType(data_type) => {
71 write!(f, "{data_type} is invalid as a DataFusion scalar type")
72 }
73 Self::InvalidTimeUnit(time_unit) => {
74 write!(
75 f,
76 "Only TimeUnit::Microsecond and TimeUnit::Nanosecond are valid time units, found: {time_unit:?}"
77 )
78 }
79 Self::NotImplemented(s) => {
80 write!(f, "Not implemented: {s}")
81 }
82 }
83 }
84}
85
86impl From<Error> for DataFusionError {
87 fn from(e: Error) -> Self {
88 plan_datafusion_err!("{}", e)
89 }
90}
91
92impl TryFrom<&Field> for protobuf::Field {
93 type Error = Error;
94
95 fn try_from(field: &Field) -> Result<Self, Self::Error> {
96 let arrow_type = field.data_type().try_into()?;
97 Ok(Self {
98 name: field.name().to_owned(),
99 arrow_type: Some(Box::new(arrow_type)),
100 nullable: field.is_nullable(),
101 children: Vec::new(),
102 metadata: field.metadata().clone(),
103 })
104 }
105}
106
107impl TryFrom<&DataType> for protobuf::ArrowType {
108 type Error = Error;
109
110 fn try_from(val: &DataType) -> Result<Self, Self::Error> {
111 let arrow_type_enum: ArrowTypeEnum = val.try_into()?;
112 Ok(Self {
113 arrow_type_enum: Some(arrow_type_enum),
114 })
115 }
116}
117
118impl TryFrom<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
119 type Error = Error;
120
121 fn try_from(val: &DataType) -> Result<Self, Self::Error> {
122 let res = match val {
123 DataType::Null => Self::None(EmptyMessage {}),
124 DataType::Boolean => Self::Bool(EmptyMessage {}),
125 DataType::Int8 => Self::Int8(EmptyMessage {}),
126 DataType::Int16 => Self::Int16(EmptyMessage {}),
127 DataType::Int32 => Self::Int32(EmptyMessage {}),
128 DataType::Int64 => Self::Int64(EmptyMessage {}),
129 DataType::UInt8 => Self::Uint8(EmptyMessage {}),
130 DataType::UInt16 => Self::Uint16(EmptyMessage {}),
131 DataType::UInt32 => Self::Uint32(EmptyMessage {}),
132 DataType::UInt64 => Self::Uint64(EmptyMessage {}),
133 DataType::Float16 => Self::Float16(EmptyMessage {}),
134 DataType::Float32 => Self::Float32(EmptyMessage {}),
135 DataType::Float64 => Self::Float64(EmptyMessage {}),
136 DataType::Timestamp(time_unit, timezone) => {
137 Self::Timestamp(protobuf::Timestamp {
138 time_unit: protobuf::TimeUnit::from(time_unit) as i32,
139 timezone: timezone.as_deref().unwrap_or("").to_string(),
140 })
141 }
142 DataType::Date32 => Self::Date32(EmptyMessage {}),
143 DataType::Date64 => Self::Date64(EmptyMessage {}),
144 DataType::Time32(time_unit) => {
145 Self::Time32(protobuf::TimeUnit::from(time_unit) as i32)
146 }
147 DataType::Time64(time_unit) => {
148 Self::Time64(protobuf::TimeUnit::from(time_unit) as i32)
149 }
150 DataType::Duration(time_unit) => {
151 Self::Duration(protobuf::TimeUnit::from(time_unit) as i32)
152 }
153 DataType::Interval(interval_unit) => {
154 Self::Interval(protobuf::IntervalUnit::from(interval_unit) as i32)
155 }
156 DataType::Binary => Self::Binary(EmptyMessage {}),
157 DataType::BinaryView => Self::BinaryView(EmptyMessage {}),
158 DataType::FixedSizeBinary(size) => Self::FixedSizeBinary(*size),
159 DataType::LargeBinary => Self::LargeBinary(EmptyMessage {}),
160 DataType::Utf8 => Self::Utf8(EmptyMessage {}),
161 DataType::Utf8View => Self::Utf8View(EmptyMessage {}),
162 DataType::LargeUtf8 => Self::LargeUtf8(EmptyMessage {}),
163 DataType::List(item_type) => Self::List(Box::new(protobuf::List {
164 field_type: Some(Box::new(item_type.as_ref().try_into()?)),
165 })),
166 DataType::FixedSizeList(item_type, size) => {
167 Self::FixedSizeList(Box::new(protobuf::FixedSizeList {
168 field_type: Some(Box::new(item_type.as_ref().try_into()?)),
169 list_size: *size,
170 }))
171 }
172 DataType::LargeList(item_type) => Self::LargeList(Box::new(protobuf::List {
173 field_type: Some(Box::new(item_type.as_ref().try_into()?)),
174 })),
175 DataType::ListView(item_type) => Self::ListView(Box::new(protobuf::List {
176 field_type: Some(Box::new(item_type.as_ref().try_into()?)),
177 })),
178 DataType::LargeListView(item_type) => {
179 Self::LargeListView(Box::new(protobuf::List {
180 field_type: Some(Box::new(item_type.as_ref().try_into()?)),
181 }))
182 }
183 DataType::Struct(struct_fields) => Self::Struct(protobuf::Struct {
184 sub_field_types: convert_arc_fields_to_proto_fields(struct_fields)?,
185 }),
186 DataType::Union(fields, union_mode) => {
187 let union_mode = match union_mode {
188 UnionMode::Sparse => protobuf::UnionMode::Sparse,
189 UnionMode::Dense => protobuf::UnionMode::Dense,
190 };
191 Self::Union(protobuf::Union {
192 union_types: convert_arc_fields_to_proto_fields(
193 fields.iter().map(|(_, item)| item),
194 )?,
195 union_mode: union_mode.into(),
196 type_ids: fields.iter().map(|(x, _)| x as i32).collect(),
197 })
198 }
199 DataType::Dictionary(key_type, value_type) => {
200 Self::Dictionary(Box::new(protobuf::Dictionary {
201 key: Some(Box::new(key_type.as_ref().try_into()?)),
202 value: Some(Box::new(value_type.as_ref().try_into()?)),
203 }))
204 }
205 DataType::Decimal32(precision, scale) => {
206 Self::Decimal32(protobuf::Decimal32Type {
207 precision: *precision as u32,
208 scale: *scale as i32,
209 })
210 }
211 DataType::Decimal64(precision, scale) => {
212 Self::Decimal64(protobuf::Decimal64Type {
213 precision: *precision as u32,
214 scale: *scale as i32,
215 })
216 }
217 DataType::Decimal128(precision, scale) => {
218 Self::Decimal128(protobuf::Decimal128Type {
219 precision: *precision as u32,
220 scale: *scale as i32,
221 })
222 }
223 DataType::Decimal256(precision, scale) => {
224 Self::Decimal256(protobuf::Decimal256Type {
225 precision: *precision as u32,
226 scale: *scale as i32,
227 })
228 }
229 DataType::Map(field, sorted) => Self::Map(Box::new(protobuf::Map {
230 field_type: Some(Box::new(field.as_ref().try_into()?)),
231 keys_sorted: *sorted,
232 })),
233 DataType::RunEndEncoded(run_ends_field, values_field) => {
234 Self::RunEndEncoded(Box::new(protobuf::RunEndEncoded {
235 run_ends_field: Some(Box::new(run_ends_field.as_ref().try_into()?)),
236 values_field: Some(Box::new(values_field.as_ref().try_into()?)),
237 }))
238 }
239 };
240
241 Ok(res)
242 }
243}
244
245impl From<Column> for protobuf::Column {
246 fn from(c: Column) -> Self {
247 Self {
248 relation: c.relation.map(|relation| protobuf::ColumnRelation {
249 relation: relation.to_string(),
250 }),
251 name: c.name,
252 }
253 }
254}
255
256impl From<&Column> for protobuf::Column {
257 fn from(c: &Column) -> Self {
258 c.clone().into()
259 }
260}
261
262impl TryFrom<&Schema> for protobuf::Schema {
263 type Error = Error;
264
265 fn try_from(schema: &Schema) -> Result<Self, Self::Error> {
266 Ok(Self {
267 columns: convert_arc_fields_to_proto_fields(schema.fields())?,
268 metadata: schema.metadata.clone(),
269 })
270 }
271}
272
273impl TryFrom<SchemaRef> for protobuf::Schema {
274 type Error = Error;
275
276 fn try_from(schema: SchemaRef) -> Result<Self, Self::Error> {
277 Ok(Self {
278 columns: convert_arc_fields_to_proto_fields(schema.fields())?,
279 metadata: schema.metadata.clone(),
280 })
281 }
282}
283
284impl TryFrom<&DFSchema> for protobuf::DfSchema {
285 type Error = Error;
286
287 fn try_from(s: &DFSchema) -> Result<Self, Self::Error> {
288 let columns = s
289 .iter()
290 .map(|(qualifier, field)| {
291 Ok(protobuf::DfField {
292 field: Some(field.as_ref().try_into()?),
293 qualifier: qualifier.map(|r| protobuf::ColumnRelation {
294 relation: r.to_string(),
295 }),
296 })
297 })
298 .collect::<Result<Vec<_>, Error>>()?;
299 Ok(Self {
300 columns,
301 metadata: s.metadata().clone(),
302 })
303 }
304}
305
306impl TryFrom<&DFSchemaRef> for protobuf::DfSchema {
307 type Error = Error;
308
309 fn try_from(s: &DFSchemaRef) -> Result<Self, Self::Error> {
310 s.as_ref().try_into()
311 }
312}
313
314impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
315 type Error = Error;
316
317 fn try_from(val: &ScalarValue) -> Result<Self, Self::Error> {
318 let data_type = val.data_type();
319 match val {
320 ScalarValue::Boolean(val) => {
321 create_proto_scalar(val.as_ref(), &data_type, |s| Value::BoolValue(*s))
322 }
323 ScalarValue::Float16(val) => {
324 create_proto_scalar(val.as_ref(), &data_type, |s| {
325 Value::Float32Value((*s).into())
326 })
327 }
328 ScalarValue::Float32(val) => {
329 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Float32Value(*s))
330 }
331 ScalarValue::Float64(val) => {
332 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Float64Value(*s))
333 }
334 ScalarValue::Int8(val) => {
335 create_proto_scalar(val.as_ref(), &data_type, |s| {
336 Value::Int8Value(*s as i32)
337 })
338 }
339 ScalarValue::Int16(val) => {
340 create_proto_scalar(val.as_ref(), &data_type, |s| {
341 Value::Int16Value(*s as i32)
342 })
343 }
344 ScalarValue::Int32(val) => {
345 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Int32Value(*s))
346 }
347 ScalarValue::Int64(val) => {
348 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Int64Value(*s))
349 }
350 ScalarValue::UInt8(val) => {
351 create_proto_scalar(val.as_ref(), &data_type, |s| {
352 Value::Uint8Value(*s as u32)
353 })
354 }
355 ScalarValue::UInt16(val) => {
356 create_proto_scalar(val.as_ref(), &data_type, |s| {
357 Value::Uint16Value(*s as u32)
358 })
359 }
360 ScalarValue::UInt32(val) => {
361 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Uint32Value(*s))
362 }
363 ScalarValue::UInt64(val) => {
364 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Uint64Value(*s))
365 }
366 ScalarValue::Utf8(val) => {
367 create_proto_scalar(val.as_ref(), &data_type, |s| {
368 Value::Utf8Value(s.to_owned())
369 })
370 }
371 ScalarValue::LargeUtf8(val) => {
372 create_proto_scalar(val.as_ref(), &data_type, |s| {
373 Value::LargeUtf8Value(s.to_owned())
374 })
375 }
376 ScalarValue::Utf8View(val) => {
377 create_proto_scalar(val.as_ref(), &data_type, |s| {
378 Value::Utf8ViewValue(s.to_owned())
379 })
380 }
381 ScalarValue::List(arr) => {
382 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
383 }
384 ScalarValue::LargeList(arr) => {
385 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
386 }
387 ScalarValue::FixedSizeList(arr) => {
388 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
389 }
390 ScalarValue::ListView(arr) => {
391 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
392 }
393 ScalarValue::LargeListView(arr) => {
394 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
395 }
396 ScalarValue::Struct(arr) => {
397 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
398 }
399 ScalarValue::Map(arr) => {
400 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
401 }
402 ScalarValue::Date32(val) => {
403 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Date32Value(*s))
404 }
405 ScalarValue::TimestampMicrosecond(val, tz) => {
406 create_proto_scalar(val.as_ref(), &data_type, |s| {
407 Value::TimestampValue(protobuf::ScalarTimestampValue {
408 timezone: tz.as_deref().unwrap_or("").to_string(),
409 value: Some(
410 protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(
411 *s,
412 ),
413 ),
414 })
415 })
416 }
417 ScalarValue::TimestampNanosecond(val, tz) => {
418 create_proto_scalar(val.as_ref(), &data_type, |s| {
419 Value::TimestampValue(protobuf::ScalarTimestampValue {
420 timezone: tz.as_deref().unwrap_or("").to_string(),
421 value: Some(
422 protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(
423 *s,
424 ),
425 ),
426 })
427 })
428 }
429 ScalarValue::Decimal32(val, p, s) => match *val {
430 Some(v) => {
431 let array = v.to_be_bytes();
432 let vec_val: Vec<u8> = array.to_vec();
433 Ok(protobuf::ScalarValue {
434 value: Some(Value::Decimal32Value(protobuf::Decimal32 {
435 value: vec_val,
436 p: *p as i64,
437 s: *s as i64,
438 })),
439 })
440 }
441 None => Ok(protobuf::ScalarValue {
442 value: Some(protobuf::scalar_value::Value::NullValue(
443 (&data_type).try_into()?,
444 )),
445 }),
446 },
447 ScalarValue::Decimal64(val, p, s) => match *val {
448 Some(v) => {
449 let array = v.to_be_bytes();
450 let vec_val: Vec<u8> = array.to_vec();
451 Ok(protobuf::ScalarValue {
452 value: Some(Value::Decimal64Value(protobuf::Decimal64 {
453 value: vec_val,
454 p: *p as i64,
455 s: *s as i64,
456 })),
457 })
458 }
459 None => Ok(protobuf::ScalarValue {
460 value: Some(protobuf::scalar_value::Value::NullValue(
461 (&data_type).try_into()?,
462 )),
463 }),
464 },
465 ScalarValue::Decimal128(val, p, s) => match *val {
466 Some(v) => {
467 let array = v.to_be_bytes();
468 let vec_val: Vec<u8> = array.to_vec();
469 Ok(protobuf::ScalarValue {
470 value: Some(Value::Decimal128Value(protobuf::Decimal128 {
471 value: vec_val,
472 p: *p as i64,
473 s: *s as i64,
474 })),
475 })
476 }
477 None => Ok(protobuf::ScalarValue {
478 value: Some(protobuf::scalar_value::Value::NullValue(
479 (&data_type).try_into()?,
480 )),
481 }),
482 },
483 ScalarValue::Decimal256(val, p, s) => match *val {
484 Some(v) => {
485 let array = v.to_be_bytes();
486 let vec_val: Vec<u8> = array.to_vec();
487 Ok(protobuf::ScalarValue {
488 value: Some(Value::Decimal256Value(protobuf::Decimal256 {
489 value: vec_val,
490 p: *p as i64,
491 s: *s as i64,
492 })),
493 })
494 }
495 None => Ok(protobuf::ScalarValue {
496 value: Some(protobuf::scalar_value::Value::NullValue(
497 (&data_type).try_into()?,
498 )),
499 }),
500 },
501 ScalarValue::Date64(val) => {
502 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Date64Value(*s))
503 }
504 ScalarValue::TimestampSecond(val, tz) => {
505 create_proto_scalar(val.as_ref(), &data_type, |s| {
506 Value::TimestampValue(protobuf::ScalarTimestampValue {
507 timezone: tz.as_deref().unwrap_or("").to_string(),
508 value: Some(
509 protobuf::scalar_timestamp_value::Value::TimeSecondValue(*s),
510 ),
511 })
512 })
513 }
514 ScalarValue::TimestampMillisecond(val, tz) => {
515 create_proto_scalar(val.as_ref(), &data_type, |s| {
516 Value::TimestampValue(protobuf::ScalarTimestampValue {
517 timezone: tz.as_deref().unwrap_or("").to_string(),
518 value: Some(
519 protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(
520 *s,
521 ),
522 ),
523 })
524 })
525 }
526 ScalarValue::IntervalYearMonth(val) => {
527 create_proto_scalar(val.as_ref(), &data_type, |s| {
528 Value::IntervalYearmonthValue(*s)
529 })
530 }
531 ScalarValue::Null => Ok(protobuf::ScalarValue {
532 value: Some(Value::NullValue((&data_type).try_into()?)),
533 }),
534
535 ScalarValue::Binary(val) => {
536 create_proto_scalar(val.as_ref(), &data_type, |s| {
537 Value::BinaryValue(s.to_owned())
538 })
539 }
540 ScalarValue::BinaryView(val) => {
541 create_proto_scalar(val.as_ref(), &data_type, |s| {
542 Value::BinaryViewValue(s.to_owned())
543 })
544 }
545 ScalarValue::LargeBinary(val) => {
546 create_proto_scalar(val.as_ref(), &data_type, |s| {
547 Value::LargeBinaryValue(s.to_owned())
548 })
549 }
550 ScalarValue::FixedSizeBinary(length, val) => {
551 create_proto_scalar(val.as_ref(), &data_type, |s| {
552 Value::FixedSizeBinaryValue(protobuf::ScalarFixedSizeBinary {
553 values: s.to_owned(),
554 length: *length,
555 })
556 })
557 }
558
559 ScalarValue::Time32Second(v) => {
560 create_proto_scalar(v.as_ref(), &data_type, |v| {
561 Value::Time32Value(protobuf::ScalarTime32Value {
562 value: Some(
563 protobuf::scalar_time32_value::Value::Time32SecondValue(*v),
564 ),
565 })
566 })
567 }
568
569 ScalarValue::Time32Millisecond(v) => {
570 create_proto_scalar(v.as_ref(), &data_type, |v| {
571 Value::Time32Value(protobuf::ScalarTime32Value {
572 value: Some(
573 protobuf::scalar_time32_value::Value::Time32MillisecondValue(
574 *v,
575 ),
576 ),
577 })
578 })
579 }
580
581 ScalarValue::Time64Microsecond(v) => {
582 create_proto_scalar(v.as_ref(), &data_type, |v| {
583 Value::Time64Value(protobuf::ScalarTime64Value {
584 value: Some(
585 protobuf::scalar_time64_value::Value::Time64MicrosecondValue(
586 *v,
587 ),
588 ),
589 })
590 })
591 }
592
593 ScalarValue::Time64Nanosecond(v) => {
594 create_proto_scalar(v.as_ref(), &data_type, |v| {
595 Value::Time64Value(protobuf::ScalarTime64Value {
596 value: Some(
597 protobuf::scalar_time64_value::Value::Time64NanosecondValue(
598 *v,
599 ),
600 ),
601 })
602 })
603 }
604
605 ScalarValue::IntervalDayTime(val) => {
606 let value = if let Some(v) = val {
607 let (days, milliseconds) = IntervalDayTimeType::to_parts(*v);
608 Value::IntervalDaytimeValue(protobuf::IntervalDayTimeValue {
609 days,
610 milliseconds,
611 })
612 } else {
613 Value::NullValue((&data_type).try_into()?)
614 };
615
616 Ok(protobuf::ScalarValue { value: Some(value) })
617 }
618
619 ScalarValue::IntervalMonthDayNano(v) => {
620 let value = if let Some(v) = v {
621 let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v);
622 Value::IntervalMonthDayNano(protobuf::IntervalMonthDayNanoValue {
623 months,
624 days,
625 nanos,
626 })
627 } else {
628 Value::NullValue((&data_type).try_into()?)
629 };
630
631 Ok(protobuf::ScalarValue { value: Some(value) })
632 }
633
634 ScalarValue::DurationSecond(v) => {
635 let value = match v {
636 Some(v) => Value::DurationSecondValue(*v),
637 None => Value::NullValue((&data_type).try_into()?),
638 };
639 Ok(protobuf::ScalarValue { value: Some(value) })
640 }
641 ScalarValue::DurationMillisecond(v) => {
642 let value = match v {
643 Some(v) => Value::DurationMillisecondValue(*v),
644 None => Value::NullValue((&data_type).try_into()?),
645 };
646 Ok(protobuf::ScalarValue { value: Some(value) })
647 }
648 ScalarValue::DurationMicrosecond(v) => {
649 let value = match v {
650 Some(v) => Value::DurationMicrosecondValue(*v),
651 None => Value::NullValue((&data_type).try_into()?),
652 };
653 Ok(protobuf::ScalarValue { value: Some(value) })
654 }
655 ScalarValue::DurationNanosecond(v) => {
656 let value = match v {
657 Some(v) => Value::DurationNanosecondValue(*v),
658 None => Value::NullValue((&data_type).try_into()?),
659 };
660 Ok(protobuf::ScalarValue { value: Some(value) })
661 }
662
663 ScalarValue::Union(val, df_fields, mode) => {
664 let mut fields =
665 Vec::<protobuf::UnionField>::with_capacity(df_fields.len());
666 for (id, field) in df_fields.iter() {
667 let field_id = id as i32;
668 let field = Some(field.as_ref().try_into()?);
669 let field = protobuf::UnionField { field_id, field };
670 fields.push(field);
671 }
672 let mode = match mode {
673 UnionMode::Sparse => 0,
674 UnionMode::Dense => 1,
675 };
676 let value = match val {
677 None => None,
678 Some((_id, v)) => Some(Box::new(v.as_ref().try_into()?)),
679 };
680 let val = protobuf::UnionValue {
681 value_id: val.as_ref().map(|(id, _v)| *id as i32).unwrap_or(0),
682 value,
683 fields,
684 mode,
685 };
686 let val = Value::UnionValue(Box::new(val));
687 let val = protobuf::ScalarValue { value: Some(val) };
688 Ok(val)
689 }
690
691 ScalarValue::Dictionary(index_type, val) => {
692 let value: protobuf::ScalarValue = val.as_ref().try_into()?;
693 Ok(protobuf::ScalarValue {
694 value: Some(Value::DictionaryValue(Box::new(
695 protobuf::ScalarDictionaryValue {
696 index_type: Some(index_type.as_ref().try_into()?),
697 value: Some(Box::new(value)),
698 },
699 ))),
700 })
701 }
702
703 ScalarValue::RunEndEncoded(run_ends_field, values_field, val) => {
704 Ok(protobuf::ScalarValue {
705 value: Some(Value::RunEndEncodedValue(Box::new(
706 protobuf::ScalarRunEndEncodedValue {
707 run_ends_field: Some(run_ends_field.as_ref().try_into()?),
708 values_field: Some(values_field.as_ref().try_into()?),
709 value: Some(Box::new(val.as_ref().try_into()?)),
710 },
711 ))),
712 })
713 }
714 }
715 }
716}
717
718impl From<&TimeUnit> for protobuf::TimeUnit {
719 fn from(val: &TimeUnit) -> Self {
720 match val {
721 TimeUnit::Second => protobuf::TimeUnit::Second,
722 TimeUnit::Millisecond => protobuf::TimeUnit::Millisecond,
723 TimeUnit::Microsecond => protobuf::TimeUnit::Microsecond,
724 TimeUnit::Nanosecond => protobuf::TimeUnit::Nanosecond,
725 }
726 }
727}
728
729impl From<&IntervalUnit> for protobuf::IntervalUnit {
730 fn from(interval_unit: &IntervalUnit) -> Self {
731 match interval_unit {
732 IntervalUnit::YearMonth => protobuf::IntervalUnit::YearMonth,
733 IntervalUnit::DayTime => protobuf::IntervalUnit::DayTime,
734 IntervalUnit::MonthDayNano => protobuf::IntervalUnit::MonthDayNano,
735 }
736 }
737}
738
739impl From<Constraints> for protobuf::Constraints {
740 fn from(value: Constraints) -> Self {
741 let constraints = value.into_iter().map(|item| item.into()).collect();
742 protobuf::Constraints { constraints }
743 }
744}
745
746impl From<Constraint> for protobuf::Constraint {
747 fn from(value: Constraint) -> Self {
748 let res = match value {
749 Constraint::PrimaryKey(indices) => {
750 let indices = indices.into_iter().map(|item| item as u64).collect();
751 protobuf::constraint::ConstraintMode::PrimaryKey(
752 protobuf::PrimaryKeyConstraint { indices },
753 )
754 }
755 Constraint::Unique(indices) => {
756 let indices = indices.into_iter().map(|item| item as u64).collect();
757 protobuf::constraint::ConstraintMode::PrimaryKey(
758 protobuf::PrimaryKeyConstraint { indices },
759 )
760 }
761 };
762 protobuf::Constraint {
763 constraint_mode: Some(res),
764 }
765 }
766}
767
768impl From<&Precision<usize>> for protobuf::Precision {
769 fn from(s: &Precision<usize>) -> protobuf::Precision {
770 match s {
771 Precision::Exact(val) => protobuf::Precision {
772 precision_info: protobuf::PrecisionInfo::Exact.into(),
773 val: Some(crate::protobuf_common::ScalarValue {
774 value: Some(Value::Uint64Value(*val as u64)),
775 }),
776 },
777 Precision::Inexact(val) => protobuf::Precision {
778 precision_info: protobuf::PrecisionInfo::Inexact.into(),
779 val: Some(crate::protobuf_common::ScalarValue {
780 value: Some(Value::Uint64Value(*val as u64)),
781 }),
782 },
783 Precision::Absent => protobuf::Precision {
784 precision_info: protobuf::PrecisionInfo::Absent.into(),
785 val: Some(crate::protobuf_common::ScalarValue { value: None }),
786 },
787 }
788 }
789}
790
791impl From<&Precision<datafusion_common::ScalarValue>> for protobuf::Precision {
792 fn from(s: &Precision<datafusion_common::ScalarValue>) -> protobuf::Precision {
793 match s {
794 Precision::Exact(val) => protobuf::Precision {
795 precision_info: protobuf::PrecisionInfo::Exact.into(),
796 val: val.try_into().ok(),
797 },
798 Precision::Inexact(val) => protobuf::Precision {
799 precision_info: protobuf::PrecisionInfo::Inexact.into(),
800 val: val.try_into().ok(),
801 },
802 Precision::Absent => protobuf::Precision {
803 precision_info: protobuf::PrecisionInfo::Absent.into(),
804 val: Some(crate::protobuf_common::ScalarValue { value: None }),
805 },
806 }
807 }
808}
809
810impl From<&Statistics> for protobuf::Statistics {
811 fn from(s: &Statistics) -> protobuf::Statistics {
812 let column_stats = s.column_statistics.iter().map(|s| s.into()).collect();
813 protobuf::Statistics {
814 num_rows: Some(protobuf::Precision::from(&s.num_rows)),
815 total_byte_size: Some(protobuf::Precision::from(&s.total_byte_size)),
816 column_stats,
817 }
818 }
819}
820
821impl From<&ColumnStatistics> for protobuf::ColumnStats {
822 fn from(s: &ColumnStatistics) -> protobuf::ColumnStats {
823 protobuf::ColumnStats {
824 min_value: Some(protobuf::Precision::from(&s.min_value)),
825 max_value: Some(protobuf::Precision::from(&s.max_value)),
826 sum_value: Some(protobuf::Precision::from(&s.sum_value)),
827 null_count: Some(protobuf::Precision::from(&s.null_count)),
828 distinct_count: Some(protobuf::Precision::from(&s.distinct_count)),
829 byte_size: Some(protobuf::Precision::from(&s.byte_size)),
830 }
831 }
832}
833
834impl From<JoinSide> for protobuf::JoinSide {
835 fn from(t: JoinSide) -> Self {
836 match t {
837 JoinSide::Left => protobuf::JoinSide::LeftSide,
838 JoinSide::Right => protobuf::JoinSide::RightSide,
839 JoinSide::None => protobuf::JoinSide::None,
840 }
841 }
842}
843
844impl From<&CompressionTypeVariant> for protobuf::CompressionTypeVariant {
845 fn from(value: &CompressionTypeVariant) -> Self {
846 match value {
847 CompressionTypeVariant::GZIP => Self::Gzip,
848 CompressionTypeVariant::BZIP2 => Self::Bzip2,
849 CompressionTypeVariant::XZ => Self::Xz,
850 CompressionTypeVariant::ZSTD => Self::Zstd,
851 CompressionTypeVariant::UNCOMPRESSED => Self::Uncompressed,
852 }
853 }
854}
855
856impl From<CsvQuoteStyle> for protobuf::CsvQuoteStyle {
857 fn from(value: CsvQuoteStyle) -> Self {
858 match value {
859 CsvQuoteStyle::Necessary => Self::Necessary,
860 CsvQuoteStyle::Always => Self::Always,
861 CsvQuoteStyle::NonNumeric => Self::NonNumeric,
862 CsvQuoteStyle::Never => Self::Never,
863 }
864 }
865}
866
867impl From<QuoteStyle> for protobuf::CsvQuoteStyle {
868 fn from(value: QuoteStyle) -> Self {
869 match value {
870 QuoteStyle::Necessary => Self::Necessary,
871 QuoteStyle::Always => Self::Always,
872 QuoteStyle::NonNumeric => Self::NonNumeric,
873 QuoteStyle::Never => Self::Never,
874 _ => Self::Necessary,
875 }
876 }
877}
878
879impl TryFrom<&CsvWriterOptions> for protobuf::CsvWriterOptions {
880 type Error = DataFusionError;
881
882 fn try_from(opts: &CsvWriterOptions) -> datafusion_common::Result<Self, Self::Error> {
883 Ok(csv_writer_options_to_proto(
884 &opts.writer_options,
885 &opts.compression,
886 ))
887 }
888}
889
890impl TryFrom<&JsonWriterOptions> for protobuf::JsonWriterOptions {
891 type Error = DataFusionError;
892
893 fn try_from(
894 opts: &JsonWriterOptions,
895 ) -> datafusion_common::Result<Self, Self::Error> {
896 let compression: protobuf::CompressionTypeVariant = opts.compression.into();
897 Ok(protobuf::JsonWriterOptions {
898 compression: compression.into(),
899 })
900 }
901}
902
903impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
904 type Error = DataFusionError;
905
906 fn try_from(value: &ParquetOptions) -> datafusion_common::Result<Self, Self::Error> {
907 Ok(protobuf::ParquetOptions {
908 enable_page_index: value.enable_page_index,
909 pruning: value.pruning,
910 skip_metadata: value.skip_metadata,
911 metadata_size_hint_opt: value.metadata_size_hint.map(|v| protobuf::parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v as u64)),
912 pushdown_filters: value.pushdown_filters,
913 reorder_filters: value.reorder_filters,
914 force_filter_selections: value.force_filter_selections,
915 data_pagesize_limit: value.data_pagesize_limit as u64,
916 write_batch_size: value.write_batch_size as u64,
917 writer_version: value.writer_version.to_string(),
918 compression_opt: value.compression.clone().map(protobuf::parquet_options::CompressionOpt::Compression),
919 dictionary_enabled_opt: value.dictionary_enabled.map(protobuf::parquet_options::DictionaryEnabledOpt::DictionaryEnabled),
920 dictionary_page_size_limit: value.dictionary_page_size_limit as u64,
921 statistics_enabled_opt: value.statistics_enabled.clone().map(protobuf::parquet_options::StatisticsEnabledOpt::StatisticsEnabled),
922 max_row_group_size: value.max_row_group_size as u64,
923 created_by: value.created_by.clone(),
924 column_index_truncate_length_opt: value.column_index_truncate_length.map(|v| protobuf::parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v as u64)),
925 statistics_truncate_length_opt: value.statistics_truncate_length.map(|v| protobuf::parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v as u64)),
926 data_page_row_count_limit: value.data_page_row_count_limit as u64,
927 encoding_opt: value.encoding.clone().map(protobuf::parquet_options::EncodingOpt::Encoding),
928 bloom_filter_on_read: value.bloom_filter_on_read,
929 bloom_filter_on_write: value.bloom_filter_on_write,
930 bloom_filter_fpp_opt: value.bloom_filter_fpp.map(protobuf::parquet_options::BloomFilterFppOpt::BloomFilterFpp),
931 bloom_filter_ndv_opt: value.bloom_filter_ndv.map(protobuf::parquet_options::BloomFilterNdvOpt::BloomFilterNdv),
932 allow_single_file_parallelism: value.allow_single_file_parallelism,
933 maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as u64,
934 maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as u64,
935 schema_force_view_types: value.schema_force_view_types,
936 binary_as_string: value.binary_as_string,
937 skip_arrow_metadata: value.skip_arrow_metadata,
938 coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96),
939 coerce_int96_tz_opt: value.coerce_int96_tz.clone().map(protobuf::parquet_options::CoerceInt96TzOpt::CoerceInt96Tz),
940 max_predicate_cache_size_opt: value.max_predicate_cache_size.map(|v| protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as u64)),
941 content_defined_chunking: Some((&value.content_defined_chunking).into()),
942 })
943 }
944}
945
946impl From<&ParquetCdcOptions> for protobuf::ParquetCdcOptions {
947 fn from(value: &ParquetCdcOptions) -> Self {
948 protobuf::ParquetCdcOptions {
949 enabled: value.enabled,
950 min_chunk_size: value.min_chunk_size as u64,
951 max_chunk_size: value.max_chunk_size as u64,
952 norm_level: value.norm_level,
953 }
954 }
955}
956
957impl TryFrom<&ParquetColumnOptions> for protobuf::ParquetColumnOptions {
958 type Error = DataFusionError;
959
960 fn try_from(
961 value: &ParquetColumnOptions,
962 ) -> datafusion_common::Result<Self, Self::Error> {
963 Ok(protobuf::ParquetColumnOptions {
964 compression_opt: value
965 .compression
966 .clone()
967 .map(protobuf::parquet_column_options::CompressionOpt::Compression),
968 dictionary_enabled_opt: value
969 .dictionary_enabled
970 .map(protobuf::parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled),
971 statistics_enabled_opt: value
972 .statistics_enabled
973 .clone()
974 .map(protobuf::parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled),
975 encoding_opt: value
976 .encoding
977 .clone()
978 .map(protobuf::parquet_column_options::EncodingOpt::Encoding),
979 bloom_filter_enabled_opt: value
980 .bloom_filter_enabled
981 .map(protobuf::parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled),
982 bloom_filter_fpp_opt: value
983 .bloom_filter_fpp
984 .map(protobuf::parquet_column_options::BloomFilterFppOpt::BloomFilterFpp),
985 bloom_filter_ndv_opt: value
986 .bloom_filter_ndv
987 .map(protobuf::parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv),
988 })
989 }
990}
991
992impl TryFrom<&TableParquetOptions> for protobuf::TableParquetOptions {
993 type Error = DataFusionError;
994 fn try_from(
995 value: &TableParquetOptions,
996 ) -> datafusion_common::Result<Self, Self::Error> {
997 let column_specific_options = value
998 .column_specific_options
999 .iter()
1000 .map(|(k, v)| {
1001 Ok(protobuf::ParquetColumnSpecificOptions {
1002 column_name: k.into(),
1003 options: Some(v.try_into()?),
1004 })
1005 })
1006 .collect::<datafusion_common::Result<Vec<_>>>()?;
1007 let key_value_metadata = value
1008 .key_value_metadata
1009 .iter()
1010 .filter_map(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone())))
1011 .collect::<HashMap<String, String>>();
1012
1013 let global: protobuf::ParquetOptions = (&value.global).try_into()?;
1014
1015 Ok(protobuf::TableParquetOptions {
1016 global: Some(global),
1017 column_specific_options,
1018 key_value_metadata,
1019 })
1020 }
1021}
1022
1023impl TryFrom<&CsvOptions> for protobuf::CsvOptions {
1024 type Error = DataFusionError; fn try_from(opts: &CsvOptions) -> datafusion_common::Result<Self, Self::Error> {
1027 let compression: protobuf::CompressionTypeVariant = opts.compression.into();
1028 let quote_style: protobuf::CsvQuoteStyle = opts.quote_style.into();
1029 Ok(protobuf::CsvOptions {
1030 has_header: opts.has_header.map_or_else(Vec::new, |h| vec![h as u8]),
1031 delimiter: vec![opts.delimiter],
1032 quote: vec![opts.quote],
1033 terminator: opts.terminator.map_or_else(Vec::new, |e| vec![e]),
1034 escape: opts.escape.map_or_else(Vec::new, |e| vec![e]),
1035 double_quote: opts.double_quote.map_or_else(Vec::new, |h| vec![h as u8]),
1036 newlines_in_values: opts
1037 .newlines_in_values
1038 .map_or_else(Vec::new, |h| vec![h as u8]),
1039 compression: compression.into(),
1040 schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
1041 date_format: opts.date_format.clone().unwrap_or_default(),
1042 datetime_format: opts.datetime_format.clone().unwrap_or_default(),
1043 timestamp_format: opts.timestamp_format.clone().unwrap_or_default(),
1044 timestamp_tz_format: opts.timestamp_tz_format.clone().unwrap_or_default(),
1045 time_format: opts.time_format.clone().unwrap_or_default(),
1046 null_value: opts.null_value.clone().unwrap_or_default(),
1047 null_regex: opts.null_regex.clone().unwrap_or_default(),
1048 comment: opts.comment.map_or_else(Vec::new, |h| vec![h]),
1049 truncated_rows: opts.truncated_rows.map_or_else(Vec::new, |h| vec![h as u8]),
1050 compression_level: opts.compression_level,
1051 quote_style: quote_style.into(),
1052 ignore_leading_whitespace: opts
1053 .ignore_leading_whitespace
1054 .map_or_else(Vec::new, |h| vec![h as u8]),
1055 ignore_trailing_whitespace: opts
1056 .ignore_trailing_whitespace
1057 .map_or_else(Vec::new, |h| vec![h as u8]),
1058 })
1059 }
1060}
1061
1062impl TryFrom<&JsonOptions> for protobuf::JsonOptions {
1063 type Error = DataFusionError;
1064
1065 fn try_from(opts: &JsonOptions) -> datafusion_common::Result<Self, Self::Error> {
1066 let compression: protobuf::CompressionTypeVariant = opts.compression.into();
1067 Ok(protobuf::JsonOptions {
1068 compression: compression.into(),
1069 schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
1070 compression_level: opts.compression_level,
1071 newline_delimited: Some(opts.newline_delimited),
1072 })
1073 }
1074}
1075
1076fn create_proto_scalar<I, T: FnOnce(&I) -> protobuf::scalar_value::Value>(
1079 v: Option<&I>,
1080 null_arrow_type: &DataType,
1081 constructor: T,
1082) -> Result<protobuf::ScalarValue, Error> {
1083 let value = v
1084 .map(constructor)
1085 .unwrap_or(protobuf::scalar_value::Value::NullValue(
1086 null_arrow_type.try_into()?,
1087 ));
1088
1089 Ok(protobuf::ScalarValue { value: Some(value) })
1090}
1091
1092fn encode_scalar_nested_value(
1095 arr: ArrayRef,
1096 val: &ScalarValue,
1097) -> Result<protobuf::ScalarValue, Error> {
1098 let batch = RecordBatch::try_from_iter(vec![("field_name", arr)]).map_err(|e| {
1099 Error::General(format!(
1100 "Error creating temporary batch while encoding nested ScalarValue: {e}"
1101 ))
1102 })?;
1103
1104 let ipc_gen = IpcDataGenerator {};
1105 let mut dict_tracker = DictionaryTracker::new(false);
1106 let write_options = IpcWriteOptions::default();
1107 ipc_gen.schema_to_bytes_with_dictionary_tracker(
1110 batch.schema().as_ref(),
1111 &mut dict_tracker,
1112 &write_options,
1113 );
1114 let mut compression_context = CompressionContext::default();
1115 let (encoded_dictionaries, encoded_message) = ipc_gen
1116 .encode(
1117 &batch,
1118 &mut dict_tracker,
1119 &write_options,
1120 &mut compression_context,
1121 )
1122 .map_err(|e| {
1123 Error::General(format!("Error encoding nested ScalarValue as IPC: {e}"))
1124 })?;
1125
1126 let schema: protobuf::Schema = batch.schema().try_into()?;
1127
1128 let scalar_list_value = protobuf::ScalarNestedValue {
1129 ipc_message: encoded_message.ipc_message,
1130 arrow_data: encoded_message.arrow_data,
1131 dictionaries: encoded_dictionaries
1132 .into_iter()
1133 .map(|data| protobuf::scalar_nested_value::Dictionary {
1134 ipc_message: data.ipc_message,
1135 arrow_data: data.arrow_data,
1136 })
1137 .collect(),
1138 schema: Some(schema),
1139 };
1140
1141 match val {
1142 ScalarValue::List(_) => Ok(protobuf::ScalarValue {
1143 value: Some(protobuf::scalar_value::Value::ListValue(scalar_list_value)),
1144 }),
1145 ScalarValue::LargeList(_) => Ok(protobuf::ScalarValue {
1146 value: Some(protobuf::scalar_value::Value::LargeListValue(
1147 scalar_list_value,
1148 )),
1149 }),
1150 ScalarValue::FixedSizeList(_) => Ok(protobuf::ScalarValue {
1151 value: Some(protobuf::scalar_value::Value::FixedSizeListValue(
1152 scalar_list_value,
1153 )),
1154 }),
1155 ScalarValue::ListView(_) => Ok(protobuf::ScalarValue {
1156 value: Some(protobuf::scalar_value::Value::ListViewValue(
1157 scalar_list_value,
1158 )),
1159 }),
1160 ScalarValue::LargeListView(_) => Ok(protobuf::ScalarValue {
1161 value: Some(protobuf::scalar_value::Value::LargeListViewValue(
1162 scalar_list_value,
1163 )),
1164 }),
1165 ScalarValue::Struct(_) => Ok(protobuf::ScalarValue {
1166 value: Some(protobuf::scalar_value::Value::StructValue(
1167 scalar_list_value,
1168 )),
1169 }),
1170 ScalarValue::Map(_) => Ok(protobuf::ScalarValue {
1171 value: Some(protobuf::scalar_value::Value::MapValue(scalar_list_value)),
1172 }),
1173 _ => unreachable!(),
1174 }
1175}
1176
1177fn convert_arc_fields_to_proto_fields<'a, I>(
1179 fields: I,
1180) -> Result<Vec<protobuf::Field>, Error>
1181where
1182 I: IntoIterator<Item = &'a Arc<Field>>,
1183{
1184 fields
1185 .into_iter()
1186 .map(|field| field.as_ref().try_into())
1187 .collect::<Result<Vec<_>, Error>>()
1188}
1189
1190pub(crate) fn csv_writer_options_to_proto(
1191 csv_options: &WriterBuilder,
1192 compression: &CompressionTypeVariant,
1193) -> protobuf::CsvWriterOptions {
1194 let compression: protobuf::CompressionTypeVariant = compression.into();
1195 let quote_style: protobuf::CsvQuoteStyle = csv_options.quote_style().into();
1196 protobuf::CsvWriterOptions {
1197 compression: compression.into(),
1198 delimiter: (csv_options.delimiter() as char).to_string(),
1199 has_header: csv_options.header(),
1200 date_format: csv_options.date_format().unwrap_or("").to_owned(),
1201 datetime_format: csv_options.datetime_format().unwrap_or("").to_owned(),
1202 timestamp_format: csv_options.timestamp_format().unwrap_or("").to_owned(),
1203 time_format: csv_options.time_format().unwrap_or("").to_owned(),
1204 null_value: csv_options.null().to_owned(),
1205 quote: (csv_options.quote() as char).to_string(),
1206 escape: (csv_options.escape() as char).to_string(),
1207 double_quote: csv_options.double_quote(),
1208 quote_style: quote_style.into(),
1209 ignore_leading_whitespace: csv_options.ignore_leading_whitespace(),
1210 ignore_trailing_whitespace: csv_options.ignore_trailing_whitespace(),
1211 }
1212}