1use std::collections::HashMap;
19use std::convert::{TryFrom, TryInto};
20use std::sync::Arc;
21
22use crate::common::proto_error;
23use crate::protobuf_common as protobuf;
24use arrow::array::{ArrayRef, AsArray};
25use arrow::buffer::Buffer;
26use arrow::csv::WriterBuilder;
27use arrow::datatypes::{
28 DataType, Field, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, Schema,
29 TimeUnit, UnionFields, UnionMode, i256,
30};
31use arrow::ipc::{
32 convert::fb_to_schema,
33 reader::{read_dictionary, read_record_batch},
34 root_as_message,
35 writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions},
36};
37
38use datafusion_common::{
39 Column, ColumnStatistics, Constraint, Constraints, DFSchema, DFSchemaRef,
40 DataFusionError, JoinSide, ScalarValue, Statistics, TableReference,
41 arrow_datafusion_err,
42 config::{
43 CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
44 TableParquetOptions,
45 },
46 file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
47 parsers::CompressionTypeVariant,
48 plan_datafusion_err,
49 stats::Precision,
50};
51
52#[derive(Debug)]
53pub enum Error {
54 General(String),
55
56 DataFusionError(DataFusionError),
57
58 MissingRequiredField(String),
59
60 AtLeastOneValue(String),
61
62 UnknownEnumVariant { name: String, value: i32 },
63}
64
65impl std::fmt::Display for Error {
66 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
67 match self {
68 Self::General(desc) => write!(f, "General error: {desc}"),
69
70 Self::DataFusionError(desc) => {
71 write!(f, "DataFusion error: {desc:?}")
72 }
73
74 Self::MissingRequiredField(name) => {
75 write!(f, "Missing required field {name}")
76 }
77 Self::AtLeastOneValue(name) => {
78 write!(f, "Must have at least one {name}, found 0")
79 }
80 Self::UnknownEnumVariant { name, value } => {
81 write!(f, "Unknown i32 value for {name} enum: {value}")
82 }
83 }
84 }
85}
86
87impl std::error::Error for Error {}
88
89impl From<DataFusionError> for Error {
90 fn from(e: DataFusionError) -> Self {
91 Error::DataFusionError(e)
92 }
93}
94
95impl Error {
96 pub fn required(field: impl Into<String>) -> Error {
97 Error::MissingRequiredField(field.into())
98 }
99
100 pub fn unknown(name: impl Into<String>, value: i32) -> Error {
101 Error::UnknownEnumVariant {
102 name: name.into(),
103 value,
104 }
105 }
106}
107
108impl From<Error> for DataFusionError {
109 fn from(e: Error) -> Self {
110 plan_datafusion_err!("{}", e)
111 }
112}
113
114pub trait FromOptionalField<T> {
117 fn optional(self) -> datafusion_common::Result<Option<T>, Error>;
122
123 fn required(self, field: impl Into<String>) -> datafusion_common::Result<T, Error>;
128}
129
130impl<T, U> FromOptionalField<U> for Option<T>
131where
132 T: TryInto<U, Error = Error>,
133{
134 fn optional(self) -> datafusion_common::Result<Option<U>, Error> {
135 self.map(|t| t.try_into()).transpose()
136 }
137
138 fn required(self, field: impl Into<String>) -> datafusion_common::Result<U, Error> {
139 match self {
140 None => Err(Error::required(field)),
141 Some(t) => t.try_into(),
142 }
143 }
144}
145
146impl From<protobuf::ColumnRelation> for TableReference {
147 fn from(rel: protobuf::ColumnRelation) -> Self {
148 Self::parse_str_normalized(rel.relation.as_str(), true)
149 }
150}
151
152impl From<protobuf::Column> for Column {
153 fn from(c: protobuf::Column) -> Self {
154 let protobuf::Column { relation, name } = c;
155
156 Self::new(relation, name)
157 }
158}
159
160impl From<&protobuf::Column> for Column {
161 fn from(c: &protobuf::Column) -> Self {
162 c.clone().into()
163 }
164}
165
166impl TryFrom<&protobuf::DfSchema> for DFSchema {
167 type Error = Error;
168
169 fn try_from(
170 df_schema: &protobuf::DfSchema,
171 ) -> datafusion_common::Result<Self, Self::Error> {
172 let df_fields = df_schema.columns.clone();
173 let qualifiers_and_fields: Vec<(Option<TableReference>, Arc<Field>)> = df_fields
174 .iter()
175 .map(|df_field| {
176 let field: Field = df_field.field.as_ref().required("field")?;
177 Ok((
178 df_field.qualifier.as_ref().map(|q| q.clone().into()),
179 Arc::new(field),
180 ))
181 })
182 .collect::<datafusion_common::Result<Vec<_>, Error>>()?;
183
184 Ok(DFSchema::new_with_metadata(
185 qualifiers_and_fields,
186 df_schema.metadata.clone(),
187 )?)
188 }
189}
190
191impl TryFrom<protobuf::DfSchema> for DFSchemaRef {
192 type Error = Error;
193
194 fn try_from(
195 df_schema: protobuf::DfSchema,
196 ) -> datafusion_common::Result<Self, Self::Error> {
197 let dfschema: DFSchema = (&df_schema).try_into()?;
198 Ok(Arc::new(dfschema))
199 }
200}
201
202impl TryFrom<&protobuf::ArrowType> for DataType {
203 type Error = Error;
204
205 fn try_from(
206 arrow_type: &protobuf::ArrowType,
207 ) -> datafusion_common::Result<Self, Self::Error> {
208 arrow_type
209 .arrow_type_enum
210 .as_ref()
211 .required("arrow_type_enum")
212 }
213}
214
215impl TryFrom<&protobuf::arrow_type::ArrowTypeEnum> for DataType {
216 type Error = Error;
217 fn try_from(
218 arrow_type_enum: &protobuf::arrow_type::ArrowTypeEnum,
219 ) -> datafusion_common::Result<Self, Self::Error> {
220 use protobuf::arrow_type;
221 Ok(match arrow_type_enum {
222 arrow_type::ArrowTypeEnum::None(_) => DataType::Null,
223 arrow_type::ArrowTypeEnum::Bool(_) => DataType::Boolean,
224 arrow_type::ArrowTypeEnum::Uint8(_) => DataType::UInt8,
225 arrow_type::ArrowTypeEnum::Int8(_) => DataType::Int8,
226 arrow_type::ArrowTypeEnum::Uint16(_) => DataType::UInt16,
227 arrow_type::ArrowTypeEnum::Int16(_) => DataType::Int16,
228 arrow_type::ArrowTypeEnum::Uint32(_) => DataType::UInt32,
229 arrow_type::ArrowTypeEnum::Int32(_) => DataType::Int32,
230 arrow_type::ArrowTypeEnum::Uint64(_) => DataType::UInt64,
231 arrow_type::ArrowTypeEnum::Int64(_) => DataType::Int64,
232 arrow_type::ArrowTypeEnum::Float16(_) => DataType::Float16,
233 arrow_type::ArrowTypeEnum::Float32(_) => DataType::Float32,
234 arrow_type::ArrowTypeEnum::Float64(_) => DataType::Float64,
235 arrow_type::ArrowTypeEnum::Utf8(_) => DataType::Utf8,
236 arrow_type::ArrowTypeEnum::Utf8View(_) => DataType::Utf8View,
237 arrow_type::ArrowTypeEnum::LargeUtf8(_) => DataType::LargeUtf8,
238 arrow_type::ArrowTypeEnum::Binary(_) => DataType::Binary,
239 arrow_type::ArrowTypeEnum::BinaryView(_) => DataType::BinaryView,
240 arrow_type::ArrowTypeEnum::FixedSizeBinary(size) => {
241 DataType::FixedSizeBinary(*size)
242 }
243 arrow_type::ArrowTypeEnum::LargeBinary(_) => DataType::LargeBinary,
244 arrow_type::ArrowTypeEnum::Date32(_) => DataType::Date32,
245 arrow_type::ArrowTypeEnum::Date64(_) => DataType::Date64,
246 arrow_type::ArrowTypeEnum::Duration(time_unit) => {
247 DataType::Duration(parse_i32_to_time_unit(time_unit)?)
248 }
249 arrow_type::ArrowTypeEnum::Timestamp(protobuf::Timestamp {
250 time_unit,
251 timezone,
252 }) => DataType::Timestamp(
253 parse_i32_to_time_unit(time_unit)?,
254 match timezone.len() {
255 0 => None,
256 _ => Some(timezone.as_str().into()),
257 },
258 ),
259 arrow_type::ArrowTypeEnum::Time32(time_unit) => {
260 DataType::Time32(parse_i32_to_time_unit(time_unit)?)
261 }
262 arrow_type::ArrowTypeEnum::Time64(time_unit) => {
263 DataType::Time64(parse_i32_to_time_unit(time_unit)?)
264 }
265 arrow_type::ArrowTypeEnum::Interval(interval_unit) => {
266 DataType::Interval(parse_i32_to_interval_unit(interval_unit)?)
267 }
268 arrow_type::ArrowTypeEnum::Decimal32(protobuf::Decimal32Type {
269 precision,
270 scale,
271 }) => DataType::Decimal32(*precision as u8, *scale as i8),
272 arrow_type::ArrowTypeEnum::Decimal64(protobuf::Decimal64Type {
273 precision,
274 scale,
275 }) => DataType::Decimal64(*precision as u8, *scale as i8),
276 arrow_type::ArrowTypeEnum::Decimal128(protobuf::Decimal128Type {
277 precision,
278 scale,
279 }) => DataType::Decimal128(*precision as u8, *scale as i8),
280 arrow_type::ArrowTypeEnum::Decimal256(protobuf::Decimal256Type {
281 precision,
282 scale,
283 }) => DataType::Decimal256(*precision as u8, *scale as i8),
284 arrow_type::ArrowTypeEnum::List(list) => {
285 let list_type =
286 list.as_ref().field_type.as_deref().required("field_type")?;
287 DataType::List(Arc::new(list_type))
288 }
289 arrow_type::ArrowTypeEnum::LargeList(list) => {
290 let list_type =
291 list.as_ref().field_type.as_deref().required("field_type")?;
292 DataType::LargeList(Arc::new(list_type))
293 }
294 arrow_type::ArrowTypeEnum::FixedSizeList(list) => {
295 let list_type =
296 list.as_ref().field_type.as_deref().required("field_type")?;
297 let list_size = list.list_size;
298 DataType::FixedSizeList(Arc::new(list_type), list_size)
299 }
300 arrow_type::ArrowTypeEnum::Struct(strct) => DataType::Struct(
301 parse_proto_fields_to_fields(&strct.sub_field_types)?.into(),
302 ),
303 arrow_type::ArrowTypeEnum::Union(union) => {
304 let union_mode = protobuf::UnionMode::try_from(union.union_mode)
305 .map_err(|_| Error::unknown("UnionMode", union.union_mode))?;
306 let union_mode = match union_mode {
307 protobuf::UnionMode::Dense => UnionMode::Dense,
308 protobuf::UnionMode::Sparse => UnionMode::Sparse,
309 };
310 let union_fields = parse_proto_fields_to_fields(&union.union_types)?;
311
312 let union_fields = if union.type_ids.is_empty() {
314 UnionFields::from_fields(union_fields)
315 } else {
316 let type_ids = union.type_ids.iter().map(|i| *i as i8);
317 UnionFields::try_new(type_ids, union_fields).map_err(|e| {
318 DataFusionError::from(e).context("Deserializing Union DataType")
319 })?
320 };
321 DataType::Union(union_fields, union_mode)
322 }
323 arrow_type::ArrowTypeEnum::Dictionary(dict) => {
324 let key_datatype = dict.as_ref().key.as_deref().required("key")?;
325 let value_datatype = dict.as_ref().value.as_deref().required("value")?;
326 DataType::Dictionary(Box::new(key_datatype), Box::new(value_datatype))
327 }
328 arrow_type::ArrowTypeEnum::Map(map) => {
329 let field: Field =
330 map.as_ref().field_type.as_deref().required("field_type")?;
331 let keys_sorted = map.keys_sorted;
332 DataType::Map(Arc::new(field), keys_sorted)
333 }
334 arrow_type::ArrowTypeEnum::RunEndEncoded(run_end_encoded) => {
335 let run_ends_field: Field = run_end_encoded
336 .as_ref()
337 .run_ends_field
338 .as_deref()
339 .required("run_ends_field")?;
340 let value_field: Field = run_end_encoded
341 .as_ref()
342 .values_field
343 .as_deref()
344 .required("values_field")?;
345 DataType::RunEndEncoded(run_ends_field.into(), value_field.into())
346 }
347 })
348 }
349}
350
351impl TryFrom<&protobuf::Field> for Field {
352 type Error = Error;
353 fn try_from(field: &protobuf::Field) -> Result<Self, Self::Error> {
354 let datatype = field.arrow_type.as_deref().required("arrow_type")?;
355 let field = Self::new(field.name.as_str(), datatype, field.nullable)
356 .with_metadata(field.metadata.clone());
357 Ok(field)
358 }
359}
360
361impl TryFrom<&protobuf::Schema> for Schema {
362 type Error = Error;
363
364 fn try_from(
365 schema: &protobuf::Schema,
366 ) -> datafusion_common::Result<Self, Self::Error> {
367 let fields = schema
368 .columns
369 .iter()
370 .map(Field::try_from)
371 .collect::<datafusion_common::Result<Vec<_>, _>>()?;
372 Ok(Self::new_with_metadata(fields, schema.metadata.clone()))
373 }
374}
375
376impl TryFrom<&protobuf::ScalarValue> for ScalarValue {
377 type Error = Error;
378
379 fn try_from(
380 scalar: &protobuf::ScalarValue,
381 ) -> datafusion_common::Result<Self, Self::Error> {
382 use protobuf::scalar_value::Value;
383
384 let value = scalar
385 .value
386 .as_ref()
387 .ok_or_else(|| Error::required("value"))?;
388
389 Ok(match value {
390 Value::BoolValue(v) => Self::Boolean(Some(*v)),
391 Value::Utf8Value(v) => Self::Utf8(Some(v.to_owned())),
392 Value::Utf8ViewValue(v) => Self::Utf8View(Some(v.to_owned())),
393 Value::LargeUtf8Value(v) => Self::LargeUtf8(Some(v.to_owned())),
394 Value::Int8Value(v) => Self::Int8(Some(*v as i8)),
395 Value::Int16Value(v) => Self::Int16(Some(*v as i16)),
396 Value::Int32Value(v) => Self::Int32(Some(*v)),
397 Value::Int64Value(v) => Self::Int64(Some(*v)),
398 Value::Uint8Value(v) => Self::UInt8(Some(*v as u8)),
399 Value::Uint16Value(v) => Self::UInt16(Some(*v as u16)),
400 Value::Uint32Value(v) => Self::UInt32(Some(*v)),
401 Value::Uint64Value(v) => Self::UInt64(Some(*v)),
402 Value::Float32Value(v) => Self::Float32(Some(*v)),
403 Value::Float64Value(v) => Self::Float64(Some(*v)),
404 Value::Date32Value(v) => Self::Date32(Some(*v)),
405 Value::ListValue(v)
407 | Value::FixedSizeListValue(v)
408 | Value::LargeListValue(v)
409 | Value::StructValue(v)
410 | Value::MapValue(v) => {
411 let protobuf::ScalarNestedValue {
412 ipc_message,
413 arrow_data,
414 dictionaries,
415 schema,
416 } = &v;
417
418 let schema: Schema = if let Some(schema_ref) = schema {
419 schema_ref.try_into()?
420 } else {
421 return Err(Error::General(
422 "Invalid schema while deserializing nested ScalarValue"
423 .to_string(),
424 ));
425 };
426
427 let schema: Schema = {
431 let ipc_gen = IpcDataGenerator {};
432 let write_options = IpcWriteOptions::default();
433 let mut dict_tracker = DictionaryTracker::new(false);
434 let encoded_schema = ipc_gen.schema_to_bytes_with_dictionary_tracker(
435 &schema,
436 &mut dict_tracker,
437 &write_options,
438 );
439 let message =
440 root_as_message(encoded_schema.ipc_message.as_slice()).map_err(
441 |e| {
442 Error::General(format!(
443 "Error IPC schema message while deserializing nested ScalarValue: {e}"
444 ))
445 },
446 )?;
447 let ipc_schema = message.header_as_schema().ok_or_else(|| {
448 Error::General(
449 "Unexpected message type deserializing nested ScalarValue schema"
450 .to_string(),
451 )
452 })?;
453 fb_to_schema(ipc_schema)
454 };
455
456 let message = root_as_message(ipc_message.as_slice()).map_err(|e| {
457 Error::General(format!(
458 "Error IPC message while deserializing nested ScalarValue: {e}"
459 ))
460 })?;
461 let buffer = Buffer::from(arrow_data.as_slice());
462
463 let ipc_batch = message.header_as_record_batch().ok_or_else(|| {
464 Error::General(
465 "Unexpected message type deserializing nested ScalarValue"
466 .to_string(),
467 )
468 })?;
469
470 let mut dict_by_id: HashMap<i64, ArrayRef> = HashMap::new();
471 for protobuf::scalar_nested_value::Dictionary {
472 ipc_message,
473 arrow_data,
474 } in dictionaries
475 {
476 let message = root_as_message(ipc_message.as_slice()).map_err(|e| {
477 Error::General(format!(
478 "Error IPC message while deserializing nested ScalarValue dictionary message: {e}"
479 ))
480 })?;
481 let buffer = Buffer::from(arrow_data.as_slice());
482
483 let dict_batch = message.header_as_dictionary_batch().ok_or_else(|| {
484 Error::General(
485 "Unexpected message type deserializing nested ScalarValue dictionary message"
486 .to_string(),
487 )
488 })?;
489 read_dictionary(
490 &buffer,
491 dict_batch,
492 &schema,
493 &mut dict_by_id,
494 &message.version(),
495 )
496 .map_err(|e| arrow_datafusion_err!(e))
497 .map_err(|e| e.context("Decoding nested ScalarValue dictionary"))?;
498 }
499
500 let record_batch = read_record_batch(
501 &buffer,
502 ipc_batch,
503 Arc::new(schema),
504 &dict_by_id,
505 None,
506 &message.version(),
507 )
508 .map_err(|e| arrow_datafusion_err!(e))
509 .map_err(|e| e.context("Decoding nested ScalarValue value"))?;
510 let arr = record_batch.column(0);
511 match value {
512 Value::ListValue(_) => {
513 Self::List(arr.as_list::<i32>().to_owned().into())
514 }
515 Value::LargeListValue(_) => {
516 Self::LargeList(arr.as_list::<i64>().to_owned().into())
517 }
518 Value::FixedSizeListValue(_) => {
519 Self::FixedSizeList(arr.as_fixed_size_list().to_owned().into())
520 }
521 Value::StructValue(_) => {
522 Self::Struct(arr.as_struct().to_owned().into())
523 }
524 Value::MapValue(_) => Self::Map(arr.as_map().to_owned().into()),
525 _ => unreachable!(),
526 }
527 }
528 Value::NullValue(v) => {
529 let null_type: DataType = v.try_into()?;
530 null_type.try_into().map_err(Error::DataFusionError)?
531 }
532 Value::Decimal32Value(val) => {
533 let array = vec_to_array(val.value.clone());
534 Self::Decimal32(Some(i32::from_be_bytes(array)), val.p as u8, val.s as i8)
535 }
536 Value::Decimal64Value(val) => {
537 let array = vec_to_array(val.value.clone());
538 Self::Decimal64(Some(i64::from_be_bytes(array)), val.p as u8, val.s as i8)
539 }
540 Value::Decimal128Value(val) => {
541 let array = vec_to_array(val.value.clone());
542 Self::Decimal128(
543 Some(i128::from_be_bytes(array)),
544 val.p as u8,
545 val.s as i8,
546 )
547 }
548 Value::Decimal256Value(val) => {
549 let array = vec_to_array(val.value.clone());
550 Self::Decimal256(
551 Some(i256::from_be_bytes(array)),
552 val.p as u8,
553 val.s as i8,
554 )
555 }
556 Value::Date64Value(v) => Self::Date64(Some(*v)),
557 Value::Time32Value(v) => {
558 let time_value =
559 v.value.as_ref().ok_or_else(|| Error::required("value"))?;
560 match time_value {
561 protobuf::scalar_time32_value::Value::Time32SecondValue(t) => {
562 Self::Time32Second(Some(*t))
563 }
564 protobuf::scalar_time32_value::Value::Time32MillisecondValue(t) => {
565 Self::Time32Millisecond(Some(*t))
566 }
567 }
568 }
569 Value::Time64Value(v) => {
570 let time_value =
571 v.value.as_ref().ok_or_else(|| Error::required("value"))?;
572 match time_value {
573 protobuf::scalar_time64_value::Value::Time64MicrosecondValue(t) => {
574 Self::Time64Microsecond(Some(*t))
575 }
576 protobuf::scalar_time64_value::Value::Time64NanosecondValue(t) => {
577 Self::Time64Nanosecond(Some(*t))
578 }
579 }
580 }
581 Value::IntervalYearmonthValue(v) => Self::IntervalYearMonth(Some(*v)),
582 Value::DurationSecondValue(v) => Self::DurationSecond(Some(*v)),
583 Value::DurationMillisecondValue(v) => Self::DurationMillisecond(Some(*v)),
584 Value::DurationMicrosecondValue(v) => Self::DurationMicrosecond(Some(*v)),
585 Value::DurationNanosecondValue(v) => Self::DurationNanosecond(Some(*v)),
586 Value::TimestampValue(v) => {
587 let timezone = if v.timezone.is_empty() {
588 None
589 } else {
590 Some(v.timezone.as_str().into())
591 };
592
593 let ts_value =
594 v.value.as_ref().ok_or_else(|| Error::required("value"))?;
595
596 match ts_value {
597 protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(t) => {
598 Self::TimestampMicrosecond(Some(*t), timezone)
599 }
600 protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(t) => {
601 Self::TimestampNanosecond(Some(*t), timezone)
602 }
603 protobuf::scalar_timestamp_value::Value::TimeSecondValue(t) => {
604 Self::TimestampSecond(Some(*t), timezone)
605 }
606 protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(t) => {
607 Self::TimestampMillisecond(Some(*t), timezone)
608 }
609 }
610 }
611 Value::DictionaryValue(v) => {
612 let index_type: DataType = v
613 .index_type
614 .as_ref()
615 .ok_or_else(|| Error::required("index_type"))?
616 .try_into()?;
617
618 let value: Self = v
619 .value
620 .as_ref()
621 .ok_or_else(|| Error::required("value"))?
622 .as_ref()
623 .try_into()?;
624
625 Self::Dictionary(Box::new(index_type), Box::new(value))
626 }
627 Value::RunEndEncodedValue(v) => {
628 let run_ends_field: Field = v
629 .run_ends_field
630 .as_ref()
631 .ok_or_else(|| Error::required("run_ends_field"))?
632 .try_into()?;
633
634 let values_field: Field = v
635 .values_field
636 .as_ref()
637 .ok_or_else(|| Error::required("values_field"))?
638 .try_into()?;
639
640 let value: Self = v
641 .value
642 .as_ref()
643 .ok_or_else(|| Error::required("value"))?
644 .as_ref()
645 .try_into()?;
646
647 Self::RunEndEncoded(
648 run_ends_field.into(),
649 values_field.into(),
650 Box::new(value),
651 )
652 }
653 Value::BinaryValue(v) => Self::Binary(Some(v.clone())),
654 Value::BinaryViewValue(v) => Self::BinaryView(Some(v.clone())),
655 Value::LargeBinaryValue(v) => Self::LargeBinary(Some(v.clone())),
656 Value::IntervalDaytimeValue(v) => Self::IntervalDayTime(Some(
657 IntervalDayTimeType::make_value(v.days, v.milliseconds),
658 )),
659 Value::IntervalMonthDayNano(v) => Self::IntervalMonthDayNano(Some(
660 IntervalMonthDayNanoType::make_value(v.months, v.days, v.nanos),
661 )),
662 Value::UnionValue(val) => {
663 let mode = match val.mode {
664 0 => UnionMode::Sparse,
665 1 => UnionMode::Dense,
666 id => Err(Error::unknown("UnionMode", id))?,
667 };
668 let ids = val
669 .fields
670 .iter()
671 .map(|f| f.field_id as i8)
672 .collect::<Vec<_>>();
673 let fields = val
674 .fields
675 .iter()
676 .map(|f| f.field.clone())
677 .collect::<Option<Vec<_>>>();
678 let fields = fields.ok_or_else(|| Error::required("UnionField"))?;
679 let fields = parse_proto_fields_to_fields(&fields)?;
680 let union_fields = UnionFields::try_new(ids, fields).map_err(|e| {
681 DataFusionError::from(e).context("Deserializing Union ScalarValue")
682 })?;
683 let v_id = val.value_id as i8;
684 let val = match &val.value {
685 None => None,
686 Some(val) => {
687 let val: ScalarValue = val
688 .as_ref()
689 .try_into()
690 .map_err(|_| Error::General("Invalid Scalar".to_string()))?;
691 Some((v_id, Box::new(val)))
692 }
693 };
694 Self::Union(val, union_fields, mode)
695 }
696 Value::FixedSizeBinaryValue(v) => {
697 Self::FixedSizeBinary(v.length, Some(v.clone().values))
698 }
699 })
700 }
701}
702
703impl From<protobuf::TimeUnit> for TimeUnit {
704 fn from(time_unit: protobuf::TimeUnit) -> Self {
705 match time_unit {
706 protobuf::TimeUnit::Second => TimeUnit::Second,
707 protobuf::TimeUnit::Millisecond => TimeUnit::Millisecond,
708 protobuf::TimeUnit::Microsecond => TimeUnit::Microsecond,
709 protobuf::TimeUnit::Nanosecond => TimeUnit::Nanosecond,
710 }
711 }
712}
713
714impl From<protobuf::IntervalUnit> for IntervalUnit {
715 fn from(interval_unit: protobuf::IntervalUnit) -> Self {
716 match interval_unit {
717 protobuf::IntervalUnit::YearMonth => IntervalUnit::YearMonth,
718 protobuf::IntervalUnit::DayTime => IntervalUnit::DayTime,
719 protobuf::IntervalUnit::MonthDayNano => IntervalUnit::MonthDayNano,
720 }
721 }
722}
723
724impl From<protobuf::Constraints> for Constraints {
725 fn from(constraints: protobuf::Constraints) -> Self {
726 Constraints::new_unverified(
727 constraints
728 .constraints
729 .into_iter()
730 .map(|item| item.into())
731 .collect(),
732 )
733 }
734}
735
736impl From<protobuf::Constraint> for Constraint {
737 fn from(value: protobuf::Constraint) -> Self {
738 match value.constraint_mode.unwrap() {
739 protobuf::constraint::ConstraintMode::PrimaryKey(elem) => {
740 Constraint::PrimaryKey(
741 elem.indices.into_iter().map(|item| item as usize).collect(),
742 )
743 }
744 protobuf::constraint::ConstraintMode::Unique(elem) => Constraint::Unique(
745 elem.indices.into_iter().map(|item| item as usize).collect(),
746 ),
747 }
748 }
749}
750
751impl From<&protobuf::ColumnStats> for ColumnStatistics {
752 fn from(cs: &protobuf::ColumnStats) -> ColumnStatistics {
753 ColumnStatistics {
754 null_count: if let Some(nc) = &cs.null_count {
755 nc.clone().into()
756 } else {
757 Precision::Absent
758 },
759 max_value: if let Some(max) = &cs.max_value {
760 max.clone().into()
761 } else {
762 Precision::Absent
763 },
764 min_value: if let Some(min) = &cs.min_value {
765 min.clone().into()
766 } else {
767 Precision::Absent
768 },
769 sum_value: if let Some(sum) = &cs.sum_value {
770 sum.clone().into()
771 } else {
772 Precision::Absent
773 },
774 distinct_count: if let Some(dc) = &cs.distinct_count {
775 dc.clone().into()
776 } else {
777 Precision::Absent
778 },
779 byte_size: if let Some(sbs) = &cs.byte_size {
780 sbs.clone().into()
781 } else {
782 Precision::Absent
783 },
784 }
785 }
786}
787
788impl From<protobuf::Precision> for Precision<usize> {
789 fn from(s: protobuf::Precision) -> Self {
790 let Ok(precision_type) = s.precision_info.try_into() else {
791 return Precision::Absent;
792 };
793 match precision_type {
794 protobuf::PrecisionInfo::Exact => {
795 if let Some(val) = s.val {
796 if let Ok(ScalarValue::UInt64(Some(val))) =
797 ScalarValue::try_from(&val)
798 {
799 Precision::Exact(val as usize)
800 } else {
801 Precision::Absent
802 }
803 } else {
804 Precision::Absent
805 }
806 }
807 protobuf::PrecisionInfo::Inexact => {
808 if let Some(val) = s.val {
809 if let Ok(ScalarValue::UInt64(Some(val))) =
810 ScalarValue::try_from(&val)
811 {
812 Precision::Inexact(val as usize)
813 } else {
814 Precision::Absent
815 }
816 } else {
817 Precision::Absent
818 }
819 }
820 protobuf::PrecisionInfo::Absent => Precision::Absent,
821 }
822 }
823}
824
825impl From<protobuf::Precision> for Precision<ScalarValue> {
826 fn from(s: protobuf::Precision) -> Self {
827 let Ok(precision_type) = s.precision_info.try_into() else {
828 return Precision::Absent;
829 };
830 match precision_type {
831 protobuf::PrecisionInfo::Exact => {
832 if let Some(val) = s.val {
833 if let Ok(val) = ScalarValue::try_from(&val) {
834 Precision::Exact(val)
835 } else {
836 Precision::Absent
837 }
838 } else {
839 Precision::Absent
840 }
841 }
842 protobuf::PrecisionInfo::Inexact => {
843 if let Some(val) = s.val {
844 if let Ok(val) = ScalarValue::try_from(&val) {
845 Precision::Inexact(val)
846 } else {
847 Precision::Absent
848 }
849 } else {
850 Precision::Absent
851 }
852 }
853 protobuf::PrecisionInfo::Absent => Precision::Absent,
854 }
855 }
856}
857
858impl From<protobuf::JoinSide> for JoinSide {
859 fn from(t: protobuf::JoinSide) -> Self {
860 match t {
861 protobuf::JoinSide::LeftSide => JoinSide::Left,
862 protobuf::JoinSide::RightSide => JoinSide::Right,
863 protobuf::JoinSide::None => JoinSide::None,
864 }
865 }
866}
867
868impl From<&protobuf::Constraint> for Constraint {
869 fn from(value: &protobuf::Constraint) -> Self {
870 match &value.constraint_mode {
871 Some(protobuf::constraint::ConstraintMode::PrimaryKey(elem)) => {
872 Constraint::PrimaryKey(
873 elem.indices.iter().map(|&item| item as usize).collect(),
874 )
875 }
876 Some(protobuf::constraint::ConstraintMode::Unique(elem)) => {
877 Constraint::Unique(
878 elem.indices.iter().map(|&item| item as usize).collect(),
879 )
880 }
881 None => panic!("constraint_mode not set"),
882 }
883 }
884}
885
886impl TryFrom<&protobuf::Constraints> for Constraints {
887 type Error = DataFusionError;
888
889 fn try_from(
890 constraints: &protobuf::Constraints,
891 ) -> datafusion_common::Result<Self, Self::Error> {
892 Ok(Constraints::new_unverified(
893 constraints
894 .constraints
895 .iter()
896 .map(|item| item.into())
897 .collect(),
898 ))
899 }
900}
901
902impl TryFrom<&protobuf::Statistics> for Statistics {
903 type Error = DataFusionError;
904
905 fn try_from(
906 s: &protobuf::Statistics,
907 ) -> datafusion_common::Result<Self, Self::Error> {
908 Ok(Statistics {
910 num_rows: if let Some(nr) = &s.num_rows {
911 nr.clone().into()
912 } else {
913 Precision::Absent
914 },
915 total_byte_size: if let Some(tbs) = &s.total_byte_size {
916 tbs.clone().into()
917 } else {
918 Precision::Absent
919 },
920 column_statistics: s.column_stats.iter().map(|s| s.into()).collect(),
922 })
923 }
924}
925
926impl From<protobuf::CompressionTypeVariant> for CompressionTypeVariant {
927 fn from(value: protobuf::CompressionTypeVariant) -> Self {
928 match value {
929 protobuf::CompressionTypeVariant::Gzip => Self::GZIP,
930 protobuf::CompressionTypeVariant::Bzip2 => Self::BZIP2,
931 protobuf::CompressionTypeVariant::Xz => Self::XZ,
932 protobuf::CompressionTypeVariant::Zstd => Self::ZSTD,
933 protobuf::CompressionTypeVariant::Uncompressed => Self::UNCOMPRESSED,
934 }
935 }
936}
937
938impl From<CompressionTypeVariant> for protobuf::CompressionTypeVariant {
939 fn from(value: CompressionTypeVariant) -> Self {
940 match value {
941 CompressionTypeVariant::GZIP => Self::Gzip,
942 CompressionTypeVariant::BZIP2 => Self::Bzip2,
943 CompressionTypeVariant::XZ => Self::Xz,
944 CompressionTypeVariant::ZSTD => Self::Zstd,
945 CompressionTypeVariant::UNCOMPRESSED => Self::Uncompressed,
946 }
947 }
948}
949
950impl TryFrom<&protobuf::CsvWriterOptions> for CsvWriterOptions {
951 type Error = DataFusionError;
952
953 fn try_from(
954 opts: &protobuf::CsvWriterOptions,
955 ) -> datafusion_common::Result<Self, Self::Error> {
956 let write_options = csv_writer_options_from_proto(opts)?;
957 let compression: CompressionTypeVariant = opts.compression().into();
958 Ok(CsvWriterOptions::new(write_options, compression))
959 }
960}
961
962impl TryFrom<&protobuf::JsonWriterOptions> for JsonWriterOptions {
963 type Error = DataFusionError;
964
965 fn try_from(
966 opts: &protobuf::JsonWriterOptions,
967 ) -> datafusion_common::Result<Self, Self::Error> {
968 let compression: CompressionTypeVariant = opts.compression().into();
969 Ok(JsonWriterOptions::new(compression))
970 }
971}
972
973impl TryFrom<&protobuf::CsvOptions> for CsvOptions {
974 type Error = DataFusionError;
975
976 fn try_from(
977 proto_opts: &protobuf::CsvOptions,
978 ) -> datafusion_common::Result<Self, Self::Error> {
979 Ok(CsvOptions {
980 has_header: proto_opts.has_header.first().map(|h| *h != 0),
981 delimiter: proto_opts.delimiter[0],
982 quote: proto_opts.quote[0],
983 terminator: proto_opts.terminator.first().copied(),
984 escape: proto_opts.escape.first().copied(),
985 double_quote: proto_opts.double_quote.first().map(|h| *h != 0),
986 newlines_in_values: proto_opts.newlines_in_values.first().map(|h| *h != 0),
987 compression: proto_opts.compression().into(),
988 compression_level: proto_opts.compression_level,
989 schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize),
990 date_format: (!proto_opts.date_format.is_empty())
991 .then(|| proto_opts.date_format.clone()),
992 datetime_format: (!proto_opts.datetime_format.is_empty())
993 .then(|| proto_opts.datetime_format.clone()),
994 timestamp_format: (!proto_opts.timestamp_format.is_empty())
995 .then(|| proto_opts.timestamp_format.clone()),
996 timestamp_tz_format: (!proto_opts.timestamp_tz_format.is_empty())
997 .then(|| proto_opts.timestamp_tz_format.clone()),
998 time_format: (!proto_opts.time_format.is_empty())
999 .then(|| proto_opts.time_format.clone()),
1000 null_value: (!proto_opts.null_value.is_empty())
1001 .then(|| proto_opts.null_value.clone()),
1002 null_regex: (!proto_opts.null_regex.is_empty())
1003 .then(|| proto_opts.null_regex.clone()),
1004 comment: proto_opts.comment.first().copied(),
1005 truncated_rows: proto_opts.truncated_rows.first().map(|h| *h != 0),
1006 })
1007 }
1008}
1009
1010impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
1011 type Error = DataFusionError;
1012
1013 fn try_from(
1014 value: &protobuf::ParquetOptions,
1015 ) -> datafusion_common::Result<Self, Self::Error> {
1016 Ok(ParquetOptions {
1017 enable_page_index: value.enable_page_index,
1018 pruning: value.pruning,
1019 skip_metadata: value.skip_metadata,
1020 metadata_size_hint: value
1021 .metadata_size_hint_opt
1022 .map(|opt| match opt {
1023 protobuf::parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v) => Some(v as usize),
1024 })
1025 .unwrap_or(None),
1026 pushdown_filters: value.pushdown_filters,
1027 reorder_filters: value.reorder_filters,
1028 force_filter_selections: value.force_filter_selections,
1029 data_pagesize_limit: value.data_pagesize_limit as usize,
1030 write_batch_size: value.write_batch_size as usize,
1031 writer_version: value.writer_version.parse().map_err(|e| {
1032 DataFusionError::Internal(format!("Failed to parse writer_version: {e}"))
1033 })?,
1034 compression: value.compression_opt.clone().map(|opt| match opt {
1035 protobuf::parquet_options::CompressionOpt::Compression(v) => Some(v),
1036 }).unwrap_or(None),
1037 dictionary_enabled: value.dictionary_enabled_opt.as_ref().map(|protobuf::parquet_options::DictionaryEnabledOpt::DictionaryEnabled(v)| *v),
1038 dictionary_page_size_limit: value.dictionary_page_size_limit as usize,
1040 statistics_enabled: value
1041 .statistics_enabled_opt.clone()
1042 .map(|opt| match opt {
1043 protobuf::parquet_options::StatisticsEnabledOpt::StatisticsEnabled(v) => Some(v),
1044 })
1045 .unwrap_or(None),
1046 max_row_group_size: value.max_row_group_size as usize,
1047 created_by: value.created_by.clone(),
1048 column_index_truncate_length: value
1049 .column_index_truncate_length_opt.as_ref()
1050 .map(|opt| match opt {
1051 protobuf::parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v) => Some(*v as usize),
1052 })
1053 .unwrap_or(None),
1054 statistics_truncate_length: value
1055 .statistics_truncate_length_opt.as_ref()
1056 .map(|opt| match opt {
1057 protobuf::parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v) => Some(*v as usize),
1058 })
1059 .unwrap_or(None),
1060 data_page_row_count_limit: value.data_page_row_count_limit as usize,
1061 encoding: value
1062 .encoding_opt.clone()
1063 .map(|opt| match opt {
1064 protobuf::parquet_options::EncodingOpt::Encoding(v) => Some(v),
1065 })
1066 .unwrap_or(None),
1067 bloom_filter_on_read: value.bloom_filter_on_read,
1068 bloom_filter_on_write: value.bloom_filter_on_write,
1069 bloom_filter_fpp: value.clone()
1070 .bloom_filter_fpp_opt
1071 .map(|opt| match opt {
1072 protobuf::parquet_options::BloomFilterFppOpt::BloomFilterFpp(v) => Some(v),
1073 })
1074 .unwrap_or(None),
1075 bloom_filter_ndv: value.clone()
1076 .bloom_filter_ndv_opt
1077 .map(|opt| match opt {
1078 protobuf::parquet_options::BloomFilterNdvOpt::BloomFilterNdv(v) => Some(v),
1079 })
1080 .unwrap_or(None),
1081 allow_single_file_parallelism: value.allow_single_file_parallelism,
1082 maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as usize,
1083 maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as usize,
1084 schema_force_view_types: value.schema_force_view_types,
1085 binary_as_string: value.binary_as_string,
1086 coerce_int96: value.coerce_int96_opt.clone().map(|opt| match opt {
1087 protobuf::parquet_options::CoerceInt96Opt::CoerceInt96(v) => Some(v),
1088 }).unwrap_or(None),
1089 skip_arrow_metadata: value.skip_arrow_metadata,
1090 max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt {
1091 protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize),
1092 }).unwrap_or(None),
1093 })
1094 }
1095}
1096
1097impl TryFrom<&protobuf::ParquetColumnOptions> for ParquetColumnOptions {
1098 type Error = DataFusionError;
1099 fn try_from(
1100 value: &protobuf::ParquetColumnOptions,
1101 ) -> datafusion_common::Result<Self, Self::Error> {
1102 Ok(ParquetColumnOptions {
1103 compression: value.compression_opt.clone().map(|opt| match opt {
1104 protobuf::parquet_column_options::CompressionOpt::Compression(v) => Some(v),
1105 }).unwrap_or(None),
1106 dictionary_enabled: value.dictionary_enabled_opt.as_ref().map(|protobuf::parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(v)| *v),
1107 statistics_enabled: value
1108 .statistics_enabled_opt.clone()
1109 .map(|opt| match opt {
1110 protobuf::parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(v) => Some(v),
1111 })
1112 .unwrap_or(None),
1113 encoding: value
1114 .encoding_opt.clone()
1115 .map(|opt| match opt {
1116 protobuf::parquet_column_options::EncodingOpt::Encoding(v) => Some(v),
1117 })
1118 .unwrap_or(None),
1119 bloom_filter_enabled: value.bloom_filter_enabled_opt.map(|opt| match opt {
1120 protobuf::parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v) => Some(v),
1121 })
1122 .unwrap_or(None),
1123 bloom_filter_fpp: value
1124 .bloom_filter_fpp_opt
1125 .map(|opt| match opt {
1126 protobuf::parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v) => Some(v),
1127 })
1128 .unwrap_or(None),
1129 bloom_filter_ndv: value
1130 .bloom_filter_ndv_opt
1131 .map(|opt| match opt {
1132 protobuf::parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v) => Some(v),
1133 })
1134 .unwrap_or(None),
1135 })
1136 }
1137}
1138
1139impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions {
1140 type Error = DataFusionError;
1141 fn try_from(
1142 value: &protobuf::TableParquetOptions,
1143 ) -> datafusion_common::Result<Self, Self::Error> {
1144 let mut column_specific_options: HashMap<String, ParquetColumnOptions> =
1145 HashMap::new();
1146 for protobuf::ParquetColumnSpecificOptions {
1147 column_name,
1148 options: maybe_options,
1149 } in &value.column_specific_options
1150 {
1151 if let Some(options) = maybe_options {
1152 column_specific_options.insert(column_name.clone(), options.try_into()?);
1153 }
1154 }
1155 Ok(TableParquetOptions {
1156 global: value
1157 .global
1158 .as_ref()
1159 .map(|v| v.try_into())
1160 .unwrap()
1161 .unwrap(),
1162 column_specific_options,
1163 key_value_metadata: Default::default(),
1164 crypto: Default::default(),
1165 })
1166 }
1167}
1168
1169impl TryFrom<&protobuf::JsonOptions> for JsonOptions {
1170 type Error = DataFusionError;
1171
1172 fn try_from(
1173 proto_opts: &protobuf::JsonOptions,
1174 ) -> datafusion_common::Result<Self, Self::Error> {
1175 let compression: protobuf::CompressionTypeVariant = proto_opts.compression();
1176 Ok(JsonOptions {
1177 compression: compression.into(),
1178 compression_level: proto_opts.compression_level,
1179 schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize),
1180 newline_delimited: proto_opts.newline_delimited.unwrap_or(true),
1181 })
1182 }
1183}
1184
1185pub fn parse_i32_to_time_unit(value: &i32) -> datafusion_common::Result<TimeUnit, Error> {
1186 protobuf::TimeUnit::try_from(*value)
1187 .map(|t| t.into())
1188 .map_err(|_| Error::unknown("TimeUnit", *value))
1189}
1190
1191pub fn parse_i32_to_interval_unit(
1192 value: &i32,
1193) -> datafusion_common::Result<IntervalUnit, Error> {
1194 protobuf::IntervalUnit::try_from(*value)
1195 .map(|t| t.into())
1196 .map_err(|_| Error::unknown("IntervalUnit", *value))
1197}
1198
1199fn vec_to_array<T, const N: usize>(v: Vec<T>) -> [T; N] {
1201 v.try_into().unwrap_or_else(|v: Vec<T>| {
1202 panic!("Expected a Vec of length {} but it was {}", N, v.len())
1203 })
1204}
1205
1206pub fn parse_proto_fields_to_fields<'a, I>(
1208 fields: I,
1209) -> std::result::Result<Vec<Field>, Error>
1210where
1211 I: IntoIterator<Item = &'a protobuf::Field>,
1212{
1213 fields
1214 .into_iter()
1215 .map(Field::try_from)
1216 .collect::<datafusion_common::Result<_, _>>()
1217}
1218
1219pub(crate) fn csv_writer_options_from_proto(
1220 writer_options: &protobuf::CsvWriterOptions,
1221) -> datafusion_common::Result<WriterBuilder> {
1222 let mut builder = WriterBuilder::new();
1223 if !writer_options.delimiter.is_empty() {
1224 if let Some(delimiter) = writer_options.delimiter.chars().next() {
1225 if delimiter.is_ascii() {
1226 builder = builder.with_delimiter(delimiter as u8);
1227 } else {
1228 return Err(proto_error("CSV Delimiter is not ASCII"));
1229 }
1230 } else {
1231 return Err(proto_error("Error parsing CSV Delimiter"));
1232 }
1233 }
1234 if !writer_options.quote.is_empty() {
1235 if let Some(quote) = writer_options.quote.chars().next() {
1236 if quote.is_ascii() {
1237 builder = builder.with_quote(quote as u8);
1238 } else {
1239 return Err(proto_error("CSV Quote is not ASCII"));
1240 }
1241 } else {
1242 return Err(proto_error("Error parsing CSV Quote"));
1243 }
1244 }
1245 if !writer_options.escape.is_empty() {
1246 if let Some(escape) = writer_options.escape.chars().next() {
1247 if escape.is_ascii() {
1248 builder = builder.with_escape(escape as u8);
1249 } else {
1250 return Err(proto_error("CSV Escape is not ASCII"));
1251 }
1252 } else {
1253 return Err(proto_error("Error parsing CSV Escape"));
1254 }
1255 }
1256 Ok(builder
1257 .with_header(writer_options.has_header)
1258 .with_date_format(writer_options.date_format.clone())
1259 .with_datetime_format(writer_options.datetime_format.clone())
1260 .with_timestamp_format(writer_options.timestamp_format.clone())
1261 .with_time_format(writer_options.time_format.clone())
1262 .with_null(writer_options.null_value.clone())
1263 .with_double_quote(writer_options.double_quote))
1264}