1use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::protobuf_common as protobuf;
22use crate::protobuf_common::{
23 arrow_type::ArrowTypeEnum, scalar_value::Value, EmptyMessage,
24};
25use arrow::array::{ArrayRef, RecordBatch};
26use arrow::csv::WriterBuilder;
27use arrow::datatypes::{
28 DataType, Field, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, Schema,
29 SchemaRef, TimeUnit, UnionMode,
30};
31use arrow::ipc::writer::{DictionaryTracker, IpcDataGenerator};
32use datafusion_common::{
33 config::{
34 CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
35 TableParquetOptions,
36 },
37 file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
38 parsers::CompressionTypeVariant,
39 plan_datafusion_err,
40 stats::Precision,
41 Column, ColumnStatistics, Constraint, Constraints, DFSchema, DFSchemaRef,
42 DataFusionError, JoinSide, ScalarValue, Statistics,
43};
44
45#[derive(Debug)]
46pub enum Error {
47 General(String),
48
49 InvalidScalarValue(ScalarValue),
50
51 InvalidScalarType(DataType),
52
53 InvalidTimeUnit(TimeUnit),
54
55 NotImplemented(String),
56}
57
58impl std::error::Error for Error {}
59
60impl std::fmt::Display for Error {
61 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
62 match self {
63 Self::General(desc) => write!(f, "General error: {desc}"),
64 Self::InvalidScalarValue(value) => {
65 write!(f, "{value:?} is invalid as a DataFusion scalar value")
66 }
67 Self::InvalidScalarType(data_type) => {
68 write!(f, "{data_type:?} is invalid as a DataFusion scalar type")
69 }
70 Self::InvalidTimeUnit(time_unit) => {
71 write!(
72 f,
73 "Only TimeUnit::Microsecond and TimeUnit::Nanosecond are valid time units, found: {time_unit:?}"
74 )
75 }
76 Self::NotImplemented(s) => {
77 write!(f, "Not implemented: {s}")
78 }
79 }
80 }
81}
82
83impl From<Error> for DataFusionError {
84 fn from(e: Error) -> Self {
85 plan_datafusion_err!("{}", e)
86 }
87}
88
89impl TryFrom<&Field> for protobuf::Field {
90 type Error = Error;
91
92 fn try_from(field: &Field) -> Result<Self, Self::Error> {
93 let arrow_type = field.data_type().try_into()?;
94 Ok(Self {
95 name: field.name().to_owned(),
96 arrow_type: Some(Box::new(arrow_type)),
97 nullable: field.is_nullable(),
98 children: Vec::new(),
99 metadata: field.metadata().clone(),
100 dict_ordered: field.dict_is_ordered().unwrap_or(false),
101 })
102 }
103}
104
105impl TryFrom<&DataType> for protobuf::ArrowType {
106 type Error = Error;
107
108 fn try_from(val: &DataType) -> Result<Self, Self::Error> {
109 let arrow_type_enum: ArrowTypeEnum = val.try_into()?;
110 Ok(Self {
111 arrow_type_enum: Some(arrow_type_enum),
112 })
113 }
114}
115
116impl TryFrom<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
117 type Error = Error;
118
119 fn try_from(val: &DataType) -> Result<Self, Self::Error> {
120 let res = match val {
121 DataType::Null => Self::None(EmptyMessage {}),
122 DataType::Boolean => Self::Bool(EmptyMessage {}),
123 DataType::Int8 => Self::Int8(EmptyMessage {}),
124 DataType::Int16 => Self::Int16(EmptyMessage {}),
125 DataType::Int32 => Self::Int32(EmptyMessage {}),
126 DataType::Int64 => Self::Int64(EmptyMessage {}),
127 DataType::UInt8 => Self::Uint8(EmptyMessage {}),
128 DataType::UInt16 => Self::Uint16(EmptyMessage {}),
129 DataType::UInt32 => Self::Uint32(EmptyMessage {}),
130 DataType::UInt64 => Self::Uint64(EmptyMessage {}),
131 DataType::Float16 => Self::Float16(EmptyMessage {}),
132 DataType::Float32 => Self::Float32(EmptyMessage {}),
133 DataType::Float64 => Self::Float64(EmptyMessage {}),
134 DataType::Timestamp(time_unit, timezone) => {
135 Self::Timestamp(protobuf::Timestamp {
136 time_unit: protobuf::TimeUnit::from(time_unit) as i32,
137 timezone: timezone.as_deref().unwrap_or("").to_string(),
138 })
139 }
140 DataType::Date32 => Self::Date32(EmptyMessage {}),
141 DataType::Date64 => Self::Date64(EmptyMessage {}),
142 DataType::Time32(time_unit) => {
143 Self::Time32(protobuf::TimeUnit::from(time_unit) as i32)
144 }
145 DataType::Time64(time_unit) => {
146 Self::Time64(protobuf::TimeUnit::from(time_unit) as i32)
147 }
148 DataType::Duration(time_unit) => {
149 Self::Duration(protobuf::TimeUnit::from(time_unit) as i32)
150 }
151 DataType::Interval(interval_unit) => {
152 Self::Interval(protobuf::IntervalUnit::from(interval_unit) as i32)
153 }
154 DataType::Binary => Self::Binary(EmptyMessage {}),
155 DataType::BinaryView => Self::BinaryView(EmptyMessage {}),
156 DataType::FixedSizeBinary(size) => Self::FixedSizeBinary(*size),
157 DataType::LargeBinary => Self::LargeBinary(EmptyMessage {}),
158 DataType::Utf8 => Self::Utf8(EmptyMessage {}),
159 DataType::Utf8View => Self::Utf8View(EmptyMessage {}),
160 DataType::LargeUtf8 => Self::LargeUtf8(EmptyMessage {}),
161 DataType::List(item_type) => Self::List(Box::new(protobuf::List {
162 field_type: Some(Box::new(item_type.as_ref().try_into()?)),
163 })),
164 DataType::FixedSizeList(item_type, size) => {
165 Self::FixedSizeList(Box::new(protobuf::FixedSizeList {
166 field_type: Some(Box::new(item_type.as_ref().try_into()?)),
167 list_size: *size,
168 }))
169 }
170 DataType::LargeList(item_type) => Self::LargeList(Box::new(protobuf::List {
171 field_type: Some(Box::new(item_type.as_ref().try_into()?)),
172 })),
173 DataType::Struct(struct_fields) => Self::Struct(protobuf::Struct {
174 sub_field_types: convert_arc_fields_to_proto_fields(struct_fields)?,
175 }),
176 DataType::Union(fields, union_mode) => {
177 let union_mode = match union_mode {
178 UnionMode::Sparse => protobuf::UnionMode::Sparse,
179 UnionMode::Dense => protobuf::UnionMode::Dense,
180 };
181 Self::Union(protobuf::Union {
182 union_types: convert_arc_fields_to_proto_fields(fields.iter().map(|(_, item)|item))?,
183 union_mode: union_mode.into(),
184 type_ids: fields.iter().map(|(x, _)| x as i32).collect(),
185 })
186 }
187 DataType::Dictionary(key_type, value_type) => {
188 Self::Dictionary(Box::new(protobuf::Dictionary {
189 key: Some(Box::new(key_type.as_ref().try_into()?)),
190 value: Some(Box::new(value_type.as_ref().try_into()?)),
191 }))
192 }
193 DataType::Decimal128(precision, scale) => Self::Decimal(protobuf::Decimal {
194 precision: *precision as u32,
195 scale: *scale as i32,
196 }),
197 DataType::Decimal256(precision, scale) => Self::Decimal256(protobuf::Decimal256Type {
198 precision: *precision as u32,
199 scale: *scale as i32,
200 }),
201 DataType::Map(field, sorted) => {
202 Self::Map(Box::new(
203 protobuf::Map {
204 field_type: Some(Box::new(field.as_ref().try_into()?)),
205 keys_sorted: *sorted,
206 }
207 ))
208 }
209 DataType::RunEndEncoded(_, _) => {
210 return Err(Error::General(
211 "Proto serialization error: The RunEndEncoded data type is not yet supported".to_owned()
212 ))
213 }
214 DataType::ListView(_) | DataType::LargeListView(_) => {
215 return Err(Error::General(format!("Proto serialization error: {val} not yet supported")))
216 }
217 };
218
219 Ok(res)
220 }
221}
222
223impl From<Column> for protobuf::Column {
224 fn from(c: Column) -> Self {
225 Self {
226 relation: c.relation.map(|relation| protobuf::ColumnRelation {
227 relation: relation.to_string(),
228 }),
229 name: c.name,
230 }
231 }
232}
233
234impl From<&Column> for protobuf::Column {
235 fn from(c: &Column) -> Self {
236 c.clone().into()
237 }
238}
239
240impl TryFrom<&Schema> for protobuf::Schema {
241 type Error = Error;
242
243 fn try_from(schema: &Schema) -> Result<Self, Self::Error> {
244 Ok(Self {
245 columns: convert_arc_fields_to_proto_fields(schema.fields())?,
246 metadata: schema.metadata.clone(),
247 })
248 }
249}
250
251impl TryFrom<SchemaRef> for protobuf::Schema {
252 type Error = Error;
253
254 fn try_from(schema: SchemaRef) -> Result<Self, Self::Error> {
255 Ok(Self {
256 columns: convert_arc_fields_to_proto_fields(schema.fields())?,
257 metadata: schema.metadata.clone(),
258 })
259 }
260}
261
262impl TryFrom<&DFSchema> for protobuf::DfSchema {
263 type Error = Error;
264
265 fn try_from(s: &DFSchema) -> Result<Self, Self::Error> {
266 let columns = s
267 .iter()
268 .map(|(qualifier, field)| {
269 Ok(protobuf::DfField {
270 field: Some(field.as_ref().try_into()?),
271 qualifier: qualifier.map(|r| protobuf::ColumnRelation {
272 relation: r.to_string(),
273 }),
274 })
275 })
276 .collect::<Result<Vec<_>, Error>>()?;
277 Ok(Self {
278 columns,
279 metadata: s.metadata().clone(),
280 })
281 }
282}
283
284impl TryFrom<&DFSchemaRef> for protobuf::DfSchema {
285 type Error = Error;
286
287 fn try_from(s: &DFSchemaRef) -> Result<Self, Self::Error> {
288 s.as_ref().try_into()
289 }
290}
291
292impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
293 type Error = Error;
294
295 fn try_from(val: &ScalarValue) -> Result<Self, Self::Error> {
296 let data_type = val.data_type();
297 match val {
298 ScalarValue::Boolean(val) => {
299 create_proto_scalar(val.as_ref(), &data_type, |s| Value::BoolValue(*s))
300 }
301 ScalarValue::Float16(val) => {
302 create_proto_scalar(val.as_ref(), &data_type, |s| {
303 Value::Float32Value((*s).into())
304 })
305 }
306 ScalarValue::Float32(val) => {
307 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Float32Value(*s))
308 }
309 ScalarValue::Float64(val) => {
310 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Float64Value(*s))
311 }
312 ScalarValue::Int8(val) => {
313 create_proto_scalar(val.as_ref(), &data_type, |s| {
314 Value::Int8Value(*s as i32)
315 })
316 }
317 ScalarValue::Int16(val) => {
318 create_proto_scalar(val.as_ref(), &data_type, |s| {
319 Value::Int16Value(*s as i32)
320 })
321 }
322 ScalarValue::Int32(val) => {
323 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Int32Value(*s))
324 }
325 ScalarValue::Int64(val) => {
326 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Int64Value(*s))
327 }
328 ScalarValue::UInt8(val) => {
329 create_proto_scalar(val.as_ref(), &data_type, |s| {
330 Value::Uint8Value(*s as u32)
331 })
332 }
333 ScalarValue::UInt16(val) => {
334 create_proto_scalar(val.as_ref(), &data_type, |s| {
335 Value::Uint16Value(*s as u32)
336 })
337 }
338 ScalarValue::UInt32(val) => {
339 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Uint32Value(*s))
340 }
341 ScalarValue::UInt64(val) => {
342 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Uint64Value(*s))
343 }
344 ScalarValue::Utf8(val) => {
345 create_proto_scalar(val.as_ref(), &data_type, |s| {
346 Value::Utf8Value(s.to_owned())
347 })
348 }
349 ScalarValue::LargeUtf8(val) => {
350 create_proto_scalar(val.as_ref(), &data_type, |s| {
351 Value::LargeUtf8Value(s.to_owned())
352 })
353 }
354 ScalarValue::Utf8View(val) => {
355 create_proto_scalar(val.as_ref(), &data_type, |s| {
356 Value::Utf8ViewValue(s.to_owned())
357 })
358 }
359 ScalarValue::List(arr) => {
360 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
361 }
362 ScalarValue::LargeList(arr) => {
363 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
364 }
365 ScalarValue::FixedSizeList(arr) => {
366 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
367 }
368 ScalarValue::Struct(arr) => {
369 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
370 }
371 ScalarValue::Map(arr) => {
372 encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
373 }
374 ScalarValue::Date32(val) => {
375 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Date32Value(*s))
376 }
377 ScalarValue::TimestampMicrosecond(val, tz) => {
378 create_proto_scalar(val.as_ref(), &data_type, |s| {
379 Value::TimestampValue(protobuf::ScalarTimestampValue {
380 timezone: tz.as_deref().unwrap_or("").to_string(),
381 value: Some(
382 protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(
383 *s,
384 ),
385 ),
386 })
387 })
388 }
389 ScalarValue::TimestampNanosecond(val, tz) => {
390 create_proto_scalar(val.as_ref(), &data_type, |s| {
391 Value::TimestampValue(protobuf::ScalarTimestampValue {
392 timezone: tz.as_deref().unwrap_or("").to_string(),
393 value: Some(
394 protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(
395 *s,
396 ),
397 ),
398 })
399 })
400 }
401 ScalarValue::Decimal128(val, p, s) => match *val {
402 Some(v) => {
403 let array = v.to_be_bytes();
404 let vec_val: Vec<u8> = array.to_vec();
405 Ok(protobuf::ScalarValue {
406 value: Some(Value::Decimal128Value(protobuf::Decimal128 {
407 value: vec_val,
408 p: *p as i64,
409 s: *s as i64,
410 })),
411 })
412 }
413 None => Ok(protobuf::ScalarValue {
414 value: Some(protobuf::scalar_value::Value::NullValue(
415 (&data_type).try_into()?,
416 )),
417 }),
418 },
419 ScalarValue::Decimal256(val, p, s) => match *val {
420 Some(v) => {
421 let array = v.to_be_bytes();
422 let vec_val: Vec<u8> = array.to_vec();
423 Ok(protobuf::ScalarValue {
424 value: Some(Value::Decimal256Value(protobuf::Decimal256 {
425 value: vec_val,
426 p: *p as i64,
427 s: *s as i64,
428 })),
429 })
430 }
431 None => Ok(protobuf::ScalarValue {
432 value: Some(protobuf::scalar_value::Value::NullValue(
433 (&data_type).try_into()?,
434 )),
435 }),
436 },
437 ScalarValue::Date64(val) => {
438 create_proto_scalar(val.as_ref(), &data_type, |s| Value::Date64Value(*s))
439 }
440 ScalarValue::TimestampSecond(val, tz) => {
441 create_proto_scalar(val.as_ref(), &data_type, |s| {
442 Value::TimestampValue(protobuf::ScalarTimestampValue {
443 timezone: tz.as_deref().unwrap_or("").to_string(),
444 value: Some(
445 protobuf::scalar_timestamp_value::Value::TimeSecondValue(*s),
446 ),
447 })
448 })
449 }
450 ScalarValue::TimestampMillisecond(val, tz) => {
451 create_proto_scalar(val.as_ref(), &data_type, |s| {
452 Value::TimestampValue(protobuf::ScalarTimestampValue {
453 timezone: tz.as_deref().unwrap_or("").to_string(),
454 value: Some(
455 protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(
456 *s,
457 ),
458 ),
459 })
460 })
461 }
462 ScalarValue::IntervalYearMonth(val) => {
463 create_proto_scalar(val.as_ref(), &data_type, |s| {
464 Value::IntervalYearmonthValue(*s)
465 })
466 }
467 ScalarValue::Null => Ok(protobuf::ScalarValue {
468 value: Some(Value::NullValue((&data_type).try_into()?)),
469 }),
470
471 ScalarValue::Binary(val) => {
472 create_proto_scalar(val.as_ref(), &data_type, |s| {
473 Value::BinaryValue(s.to_owned())
474 })
475 }
476 ScalarValue::BinaryView(val) => {
477 create_proto_scalar(val.as_ref(), &data_type, |s| {
478 Value::BinaryViewValue(s.to_owned())
479 })
480 }
481 ScalarValue::LargeBinary(val) => {
482 create_proto_scalar(val.as_ref(), &data_type, |s| {
483 Value::LargeBinaryValue(s.to_owned())
484 })
485 }
486 ScalarValue::FixedSizeBinary(length, val) => {
487 create_proto_scalar(val.as_ref(), &data_type, |s| {
488 Value::FixedSizeBinaryValue(protobuf::ScalarFixedSizeBinary {
489 values: s.to_owned(),
490 length: *length,
491 })
492 })
493 }
494
495 ScalarValue::Time32Second(v) => {
496 create_proto_scalar(v.as_ref(), &data_type, |v| {
497 Value::Time32Value(protobuf::ScalarTime32Value {
498 value: Some(
499 protobuf::scalar_time32_value::Value::Time32SecondValue(*v),
500 ),
501 })
502 })
503 }
504
505 ScalarValue::Time32Millisecond(v) => {
506 create_proto_scalar(v.as_ref(), &data_type, |v| {
507 Value::Time32Value(protobuf::ScalarTime32Value {
508 value: Some(
509 protobuf::scalar_time32_value::Value::Time32MillisecondValue(
510 *v,
511 ),
512 ),
513 })
514 })
515 }
516
517 ScalarValue::Time64Microsecond(v) => {
518 create_proto_scalar(v.as_ref(), &data_type, |v| {
519 Value::Time64Value(protobuf::ScalarTime64Value {
520 value: Some(
521 protobuf::scalar_time64_value::Value::Time64MicrosecondValue(
522 *v,
523 ),
524 ),
525 })
526 })
527 }
528
529 ScalarValue::Time64Nanosecond(v) => {
530 create_proto_scalar(v.as_ref(), &data_type, |v| {
531 Value::Time64Value(protobuf::ScalarTime64Value {
532 value: Some(
533 protobuf::scalar_time64_value::Value::Time64NanosecondValue(
534 *v,
535 ),
536 ),
537 })
538 })
539 }
540
541 ScalarValue::IntervalDayTime(val) => {
542 let value = if let Some(v) = val {
543 let (days, milliseconds) = IntervalDayTimeType::to_parts(*v);
544 Value::IntervalDaytimeValue(protobuf::IntervalDayTimeValue {
545 days,
546 milliseconds,
547 })
548 } else {
549 Value::NullValue((&data_type).try_into()?)
550 };
551
552 Ok(protobuf::ScalarValue { value: Some(value) })
553 }
554
555 ScalarValue::IntervalMonthDayNano(v) => {
556 let value = if let Some(v) = v {
557 let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v);
558 Value::IntervalMonthDayNano(protobuf::IntervalMonthDayNanoValue {
559 months,
560 days,
561 nanos,
562 })
563 } else {
564 Value::NullValue((&data_type).try_into()?)
565 };
566
567 Ok(protobuf::ScalarValue { value: Some(value) })
568 }
569
570 ScalarValue::DurationSecond(v) => {
571 let value = match v {
572 Some(v) => Value::DurationSecondValue(*v),
573 None => Value::NullValue((&data_type).try_into()?),
574 };
575 Ok(protobuf::ScalarValue { value: Some(value) })
576 }
577 ScalarValue::DurationMillisecond(v) => {
578 let value = match v {
579 Some(v) => Value::DurationMillisecondValue(*v),
580 None => Value::NullValue((&data_type).try_into()?),
581 };
582 Ok(protobuf::ScalarValue { value: Some(value) })
583 }
584 ScalarValue::DurationMicrosecond(v) => {
585 let value = match v {
586 Some(v) => Value::DurationMicrosecondValue(*v),
587 None => Value::NullValue((&data_type).try_into()?),
588 };
589 Ok(protobuf::ScalarValue { value: Some(value) })
590 }
591 ScalarValue::DurationNanosecond(v) => {
592 let value = match v {
593 Some(v) => Value::DurationNanosecondValue(*v),
594 None => Value::NullValue((&data_type).try_into()?),
595 };
596 Ok(protobuf::ScalarValue { value: Some(value) })
597 }
598
599 ScalarValue::Union(val, df_fields, mode) => {
600 let mut fields =
601 Vec::<protobuf::UnionField>::with_capacity(df_fields.len());
602 for (id, field) in df_fields.iter() {
603 let field_id = id as i32;
604 let field = Some(field.as_ref().try_into()?);
605 let field = protobuf::UnionField { field_id, field };
606 fields.push(field);
607 }
608 let mode = match mode {
609 UnionMode::Sparse => 0,
610 UnionMode::Dense => 1,
611 };
612 let value = match val {
613 None => None,
614 Some((_id, v)) => Some(Box::new(v.as_ref().try_into()?)),
615 };
616 let val = protobuf::UnionValue {
617 value_id: val.as_ref().map(|(id, _v)| *id as i32).unwrap_or(0),
618 value,
619 fields,
620 mode,
621 };
622 let val = Value::UnionValue(Box::new(val));
623 let val = protobuf::ScalarValue { value: Some(val) };
624 Ok(val)
625 }
626
627 ScalarValue::Dictionary(index_type, val) => {
628 let value: protobuf::ScalarValue = val.as_ref().try_into()?;
629 Ok(protobuf::ScalarValue {
630 value: Some(Value::DictionaryValue(Box::new(
631 protobuf::ScalarDictionaryValue {
632 index_type: Some(index_type.as_ref().try_into()?),
633 value: Some(Box::new(value)),
634 },
635 ))),
636 })
637 }
638 }
639 }
640}
641
642impl From<&TimeUnit> for protobuf::TimeUnit {
643 fn from(val: &TimeUnit) -> Self {
644 match val {
645 TimeUnit::Second => protobuf::TimeUnit::Second,
646 TimeUnit::Millisecond => protobuf::TimeUnit::Millisecond,
647 TimeUnit::Microsecond => protobuf::TimeUnit::Microsecond,
648 TimeUnit::Nanosecond => protobuf::TimeUnit::Nanosecond,
649 }
650 }
651}
652
653impl From<&IntervalUnit> for protobuf::IntervalUnit {
654 fn from(interval_unit: &IntervalUnit) -> Self {
655 match interval_unit {
656 IntervalUnit::YearMonth => protobuf::IntervalUnit::YearMonth,
657 IntervalUnit::DayTime => protobuf::IntervalUnit::DayTime,
658 IntervalUnit::MonthDayNano => protobuf::IntervalUnit::MonthDayNano,
659 }
660 }
661}
662
663impl From<Constraints> for protobuf::Constraints {
664 fn from(value: Constraints) -> Self {
665 let constraints = value.into_iter().map(|item| item.into()).collect();
666 protobuf::Constraints { constraints }
667 }
668}
669
670impl From<Constraint> for protobuf::Constraint {
671 fn from(value: Constraint) -> Self {
672 let res = match value {
673 Constraint::PrimaryKey(indices) => {
674 let indices = indices.into_iter().map(|item| item as u64).collect();
675 protobuf::constraint::ConstraintMode::PrimaryKey(
676 protobuf::PrimaryKeyConstraint { indices },
677 )
678 }
679 Constraint::Unique(indices) => {
680 let indices = indices.into_iter().map(|item| item as u64).collect();
681 protobuf::constraint::ConstraintMode::PrimaryKey(
682 protobuf::PrimaryKeyConstraint { indices },
683 )
684 }
685 };
686 protobuf::Constraint {
687 constraint_mode: Some(res),
688 }
689 }
690}
691
692impl From<&Precision<usize>> for protobuf::Precision {
693 fn from(s: &Precision<usize>) -> protobuf::Precision {
694 match s {
695 Precision::Exact(val) => protobuf::Precision {
696 precision_info: protobuf::PrecisionInfo::Exact.into(),
697 val: Some(crate::protobuf_common::ScalarValue {
698 value: Some(Value::Uint64Value(*val as u64)),
699 }),
700 },
701 Precision::Inexact(val) => protobuf::Precision {
702 precision_info: protobuf::PrecisionInfo::Inexact.into(),
703 val: Some(crate::protobuf_common::ScalarValue {
704 value: Some(Value::Uint64Value(*val as u64)),
705 }),
706 },
707 Precision::Absent => protobuf::Precision {
708 precision_info: protobuf::PrecisionInfo::Absent.into(),
709 val: Some(crate::protobuf_common::ScalarValue { value: None }),
710 },
711 }
712 }
713}
714
715impl From<&Precision<datafusion_common::ScalarValue>> for protobuf::Precision {
716 fn from(s: &Precision<datafusion_common::ScalarValue>) -> protobuf::Precision {
717 match s {
718 Precision::Exact(val) => protobuf::Precision {
719 precision_info: protobuf::PrecisionInfo::Exact.into(),
720 val: val.try_into().ok(),
721 },
722 Precision::Inexact(val) => protobuf::Precision {
723 precision_info: protobuf::PrecisionInfo::Inexact.into(),
724 val: val.try_into().ok(),
725 },
726 Precision::Absent => protobuf::Precision {
727 precision_info: protobuf::PrecisionInfo::Absent.into(),
728 val: Some(crate::protobuf_common::ScalarValue { value: None }),
729 },
730 }
731 }
732}
733
734impl From<&Statistics> for protobuf::Statistics {
735 fn from(s: &Statistics) -> protobuf::Statistics {
736 let column_stats = s.column_statistics.iter().map(|s| s.into()).collect();
737 protobuf::Statistics {
738 num_rows: Some(protobuf::Precision::from(&s.num_rows)),
739 total_byte_size: Some(protobuf::Precision::from(&s.total_byte_size)),
740 column_stats,
741 }
742 }
743}
744
745impl From<&ColumnStatistics> for protobuf::ColumnStats {
746 fn from(s: &ColumnStatistics) -> protobuf::ColumnStats {
747 protobuf::ColumnStats {
748 min_value: Some(protobuf::Precision::from(&s.min_value)),
749 max_value: Some(protobuf::Precision::from(&s.max_value)),
750 sum_value: Some(protobuf::Precision::from(&s.sum_value)),
751 null_count: Some(protobuf::Precision::from(&s.null_count)),
752 distinct_count: Some(protobuf::Precision::from(&s.distinct_count)),
753 }
754 }
755}
756
757impl From<JoinSide> for protobuf::JoinSide {
758 fn from(t: JoinSide) -> Self {
759 match t {
760 JoinSide::Left => protobuf::JoinSide::LeftSide,
761 JoinSide::Right => protobuf::JoinSide::RightSide,
762 JoinSide::None => protobuf::JoinSide::None,
763 }
764 }
765}
766
767impl From<&CompressionTypeVariant> for protobuf::CompressionTypeVariant {
768 fn from(value: &CompressionTypeVariant) -> Self {
769 match value {
770 CompressionTypeVariant::GZIP => Self::Gzip,
771 CompressionTypeVariant::BZIP2 => Self::Bzip2,
772 CompressionTypeVariant::XZ => Self::Xz,
773 CompressionTypeVariant::ZSTD => Self::Zstd,
774 CompressionTypeVariant::UNCOMPRESSED => Self::Uncompressed,
775 }
776 }
777}
778
779impl TryFrom<&CsvWriterOptions> for protobuf::CsvWriterOptions {
780 type Error = DataFusionError;
781
782 fn try_from(opts: &CsvWriterOptions) -> datafusion_common::Result<Self, Self::Error> {
783 Ok(csv_writer_options_to_proto(
784 &opts.writer_options,
785 &opts.compression,
786 ))
787 }
788}
789
790impl TryFrom<&JsonWriterOptions> for protobuf::JsonWriterOptions {
791 type Error = DataFusionError;
792
793 fn try_from(
794 opts: &JsonWriterOptions,
795 ) -> datafusion_common::Result<Self, Self::Error> {
796 let compression: protobuf::CompressionTypeVariant = opts.compression.into();
797 Ok(protobuf::JsonWriterOptions {
798 compression: compression.into(),
799 })
800 }
801}
802
803impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
804 type Error = DataFusionError;
805
806 fn try_from(value: &ParquetOptions) -> datafusion_common::Result<Self, Self::Error> {
807 Ok(protobuf::ParquetOptions {
808 enable_page_index: value.enable_page_index,
809 pruning: value.pruning,
810 skip_metadata: value.skip_metadata,
811 metadata_size_hint_opt: value.metadata_size_hint.map(|v| protobuf::parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v as u64)),
812 pushdown_filters: value.pushdown_filters,
813 reorder_filters: value.reorder_filters,
814 data_pagesize_limit: value.data_pagesize_limit as u64,
815 write_batch_size: value.write_batch_size as u64,
816 writer_version: value.writer_version.clone(),
817 compression_opt: value.compression.clone().map(protobuf::parquet_options::CompressionOpt::Compression),
818 dictionary_enabled_opt: value.dictionary_enabled.map(protobuf::parquet_options::DictionaryEnabledOpt::DictionaryEnabled),
819 dictionary_page_size_limit: value.dictionary_page_size_limit as u64,
820 statistics_enabled_opt: value.statistics_enabled.clone().map(protobuf::parquet_options::StatisticsEnabledOpt::StatisticsEnabled),
821 #[allow(deprecated)]
822 max_statistics_size_opt: value.max_statistics_size.map(|v| protobuf::parquet_options::MaxStatisticsSizeOpt::MaxStatisticsSize(v as u64)),
823 max_row_group_size: value.max_row_group_size as u64,
824 created_by: value.created_by.clone(),
825 column_index_truncate_length_opt: value.column_index_truncate_length.map(|v| protobuf::parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v as u64)),
826 statistics_truncate_length_opt: value.statistics_truncate_length.map(|v| protobuf::parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v as u64)),
827 data_page_row_count_limit: value.data_page_row_count_limit as u64,
828 encoding_opt: value.encoding.clone().map(protobuf::parquet_options::EncodingOpt::Encoding),
829 bloom_filter_on_read: value.bloom_filter_on_read,
830 bloom_filter_on_write: value.bloom_filter_on_write,
831 bloom_filter_fpp_opt: value.bloom_filter_fpp.map(protobuf::parquet_options::BloomFilterFppOpt::BloomFilterFpp),
832 bloom_filter_ndv_opt: value.bloom_filter_ndv.map(protobuf::parquet_options::BloomFilterNdvOpt::BloomFilterNdv),
833 allow_single_file_parallelism: value.allow_single_file_parallelism,
834 maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as u64,
835 maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as u64,
836 schema_force_view_types: value.schema_force_view_types,
837 binary_as_string: value.binary_as_string,
838 skip_arrow_metadata: value.skip_arrow_metadata,
839 coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96),
840 })
841 }
842}
843
844impl TryFrom<&ParquetColumnOptions> for protobuf::ParquetColumnOptions {
845 type Error = DataFusionError;
846
847 fn try_from(
848 value: &ParquetColumnOptions,
849 ) -> datafusion_common::Result<Self, Self::Error> {
850 Ok(protobuf::ParquetColumnOptions {
851 compression_opt: value
852 .compression
853 .clone()
854 .map(protobuf::parquet_column_options::CompressionOpt::Compression),
855 dictionary_enabled_opt: value
856 .dictionary_enabled
857 .map(protobuf::parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled),
858 statistics_enabled_opt: value
859 .statistics_enabled
860 .clone()
861 .map(protobuf::parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled),
862 #[allow(deprecated)]
863 max_statistics_size_opt: value.max_statistics_size.map(|v| {
864 protobuf::parquet_column_options::MaxStatisticsSizeOpt::MaxStatisticsSize(
865 v as u32,
866 )
867 }),
868 encoding_opt: value
869 .encoding
870 .clone()
871 .map(protobuf::parquet_column_options::EncodingOpt::Encoding),
872 bloom_filter_enabled_opt: value
873 .bloom_filter_enabled
874 .map(protobuf::parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled),
875 bloom_filter_fpp_opt: value
876 .bloom_filter_fpp
877 .map(protobuf::parquet_column_options::BloomFilterFppOpt::BloomFilterFpp),
878 bloom_filter_ndv_opt: value
879 .bloom_filter_ndv
880 .map(protobuf::parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv),
881 })
882 }
883}
884
885impl TryFrom<&TableParquetOptions> for protobuf::TableParquetOptions {
886 type Error = DataFusionError;
887 fn try_from(
888 value: &TableParquetOptions,
889 ) -> datafusion_common::Result<Self, Self::Error> {
890 let column_specific_options = value
891 .column_specific_options
892 .iter()
893 .map(|(k, v)| {
894 Ok(protobuf::ParquetColumnSpecificOptions {
895 column_name: k.into(),
896 options: Some(v.try_into()?),
897 })
898 })
899 .collect::<datafusion_common::Result<Vec<_>>>()?;
900 let key_value_metadata = value
901 .key_value_metadata
902 .iter()
903 .filter_map(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone())))
904 .collect::<HashMap<String, String>>();
905 Ok(protobuf::TableParquetOptions {
906 global: Some((&value.global).try_into()?),
907 column_specific_options,
908 key_value_metadata,
909 })
910 }
911}
912
913impl TryFrom<&CsvOptions> for protobuf::CsvOptions {
914 type Error = DataFusionError; fn try_from(opts: &CsvOptions) -> datafusion_common::Result<Self, Self::Error> {
917 let compression: protobuf::CompressionTypeVariant = opts.compression.into();
918 Ok(protobuf::CsvOptions {
919 has_header: opts.has_header.map_or_else(Vec::new, |h| vec![h as u8]),
920 delimiter: vec![opts.delimiter],
921 quote: vec![opts.quote],
922 terminator: opts.terminator.map_or_else(Vec::new, |e| vec![e]),
923 escape: opts.escape.map_or_else(Vec::new, |e| vec![e]),
924 double_quote: opts.double_quote.map_or_else(Vec::new, |h| vec![h as u8]),
925 newlines_in_values: opts
926 .newlines_in_values
927 .map_or_else(Vec::new, |h| vec![h as u8]),
928 compression: compression.into(),
929 schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
930 date_format: opts.date_format.clone().unwrap_or_default(),
931 datetime_format: opts.datetime_format.clone().unwrap_or_default(),
932 timestamp_format: opts.timestamp_format.clone().unwrap_or_default(),
933 timestamp_tz_format: opts.timestamp_tz_format.clone().unwrap_or_default(),
934 time_format: opts.time_format.clone().unwrap_or_default(),
935 null_value: opts.null_value.clone().unwrap_or_default(),
936 null_regex: opts.null_regex.clone().unwrap_or_default(),
937 comment: opts.comment.map_or_else(Vec::new, |h| vec![h]),
938 })
939 }
940}
941
942impl TryFrom<&JsonOptions> for protobuf::JsonOptions {
943 type Error = DataFusionError;
944
945 fn try_from(opts: &JsonOptions) -> datafusion_common::Result<Self, Self::Error> {
946 let compression: protobuf::CompressionTypeVariant = opts.compression.into();
947 Ok(protobuf::JsonOptions {
948 compression: compression.into(),
949 schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
950 })
951 }
952}
953
954fn create_proto_scalar<I, T: FnOnce(&I) -> protobuf::scalar_value::Value>(
957 v: Option<&I>,
958 null_arrow_type: &DataType,
959 constructor: T,
960) -> Result<protobuf::ScalarValue, Error> {
961 let value = v
962 .map(constructor)
963 .unwrap_or(protobuf::scalar_value::Value::NullValue(
964 null_arrow_type.try_into()?,
965 ));
966
967 Ok(protobuf::ScalarValue { value: Some(value) })
968}
969
970fn encode_scalar_nested_value(
973 arr: ArrayRef,
974 val: &ScalarValue,
975) -> Result<protobuf::ScalarValue, Error> {
976 let batch = RecordBatch::try_from_iter(vec![("field_name", arr)]).map_err(|e| {
977 Error::General(format!(
978 "Error creating temporary batch while encoding ScalarValue::List: {e}"
979 ))
980 })?;
981
982 let gen = IpcDataGenerator {};
983 let mut dict_tracker = DictionaryTracker::new(false);
984 let (encoded_dictionaries, encoded_message) = gen
985 .encoded_batch(&batch, &mut dict_tracker, &Default::default())
986 .map_err(|e| {
987 Error::General(format!("Error encoding ScalarValue::List as IPC: {e}"))
988 })?;
989
990 let schema: protobuf::Schema = batch.schema().try_into()?;
991
992 let scalar_list_value = protobuf::ScalarNestedValue {
993 ipc_message: encoded_message.ipc_message,
994 arrow_data: encoded_message.arrow_data,
995 dictionaries: encoded_dictionaries
996 .into_iter()
997 .map(|data| protobuf::scalar_nested_value::Dictionary {
998 ipc_message: data.ipc_message,
999 arrow_data: data.arrow_data,
1000 })
1001 .collect(),
1002 schema: Some(schema),
1003 };
1004
1005 match val {
1006 ScalarValue::List(_) => Ok(protobuf::ScalarValue {
1007 value: Some(protobuf::scalar_value::Value::ListValue(scalar_list_value)),
1008 }),
1009 ScalarValue::LargeList(_) => Ok(protobuf::ScalarValue {
1010 value: Some(protobuf::scalar_value::Value::LargeListValue(
1011 scalar_list_value,
1012 )),
1013 }),
1014 ScalarValue::FixedSizeList(_) => Ok(protobuf::ScalarValue {
1015 value: Some(protobuf::scalar_value::Value::FixedSizeListValue(
1016 scalar_list_value,
1017 )),
1018 }),
1019 ScalarValue::Struct(_) => Ok(protobuf::ScalarValue {
1020 value: Some(protobuf::scalar_value::Value::StructValue(
1021 scalar_list_value,
1022 )),
1023 }),
1024 ScalarValue::Map(_) => Ok(protobuf::ScalarValue {
1025 value: Some(protobuf::scalar_value::Value::MapValue(scalar_list_value)),
1026 }),
1027 _ => unreachable!(),
1028 }
1029}
1030
1031fn convert_arc_fields_to_proto_fields<'a, I>(
1033 fields: I,
1034) -> Result<Vec<protobuf::Field>, Error>
1035where
1036 I: IntoIterator<Item = &'a Arc<Field>>,
1037{
1038 fields
1039 .into_iter()
1040 .map(|field| field.as_ref().try_into())
1041 .collect::<Result<Vec<_>, Error>>()
1042}
1043
1044pub(crate) fn csv_writer_options_to_proto(
1045 csv_options: &WriterBuilder,
1046 compression: &CompressionTypeVariant,
1047) -> protobuf::CsvWriterOptions {
1048 let compression: protobuf::CompressionTypeVariant = compression.into();
1049 protobuf::CsvWriterOptions {
1050 compression: compression.into(),
1051 delimiter: (csv_options.delimiter() as char).to_string(),
1052 has_header: csv_options.header(),
1053 date_format: csv_options.date_format().unwrap_or("").to_owned(),
1054 datetime_format: csv_options.datetime_format().unwrap_or("").to_owned(),
1055 timestamp_format: csv_options.timestamp_format().unwrap_or("").to_owned(),
1056 time_format: csv_options.time_format().unwrap_or("").to_owned(),
1057 null_value: csv_options.null().to_owned(),
1058 quote: (csv_options.quote() as char).to_string(),
1059 escape: (csv_options.escape() as char).to_string(),
1060 double_quote: csv_options.double_quote(),
1061 }
1062}