1use std::collections::HashMap;
19use std::convert::{TryFrom, TryInto};
20use std::sync::Arc;
21
22use crate::common::proto_error;
23use crate::protobuf_common as protobuf;
24use arrow::array::{ArrayRef, AsArray};
25use arrow::buffer::Buffer;
26use arrow::csv::WriterBuilder;
27use arrow::datatypes::{
28 i256, DataType, Field, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit,
29 Schema, TimeUnit, UnionFields, UnionMode,
30};
31use arrow::ipc::{reader::read_record_batch, root_as_message};
32
33use datafusion_common::{
34 arrow_datafusion_err,
35 config::{
36 CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
37 TableParquetOptions,
38 },
39 file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
40 not_impl_err,
41 parsers::CompressionTypeVariant,
42 plan_datafusion_err,
43 stats::Precision,
44 Column, ColumnStatistics, Constraint, Constraints, DFSchema, DFSchemaRef,
45 DataFusionError, JoinSide, ScalarValue, Statistics, TableReference,
46};
47
48#[derive(Debug)]
49pub enum Error {
50 General(String),
51
52 DataFusionError(DataFusionError),
53
54 MissingRequiredField(String),
55
56 AtLeastOneValue(String),
57
58 UnknownEnumVariant { name: String, value: i32 },
59}
60
61impl std::fmt::Display for Error {
62 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
63 match self {
64 Self::General(desc) => write!(f, "General error: {desc}"),
65
66 Self::DataFusionError(desc) => {
67 write!(f, "DataFusion error: {desc:?}")
68 }
69
70 Self::MissingRequiredField(name) => {
71 write!(f, "Missing required field {name}")
72 }
73 Self::AtLeastOneValue(name) => {
74 write!(f, "Must have at least one {name}, found 0")
75 }
76 Self::UnknownEnumVariant { name, value } => {
77 write!(f, "Unknown i32 value for {name} enum: {value}")
78 }
79 }
80 }
81}
82
83impl std::error::Error for Error {}
84
85impl From<DataFusionError> for Error {
86 fn from(e: DataFusionError) -> Self {
87 Error::DataFusionError(e)
88 }
89}
90
91impl Error {
92 pub fn required(field: impl Into<String>) -> Error {
93 Error::MissingRequiredField(field.into())
94 }
95
96 pub fn unknown(name: impl Into<String>, value: i32) -> Error {
97 Error::UnknownEnumVariant {
98 name: name.into(),
99 value,
100 }
101 }
102}
103
104impl From<Error> for DataFusionError {
105 fn from(e: Error) -> Self {
106 plan_datafusion_err!("{}", e)
107 }
108}
109
110pub trait FromOptionalField<T> {
113 fn optional(self) -> datafusion_common::Result<Option<T>, Error>;
118
119 fn required(self, field: impl Into<String>) -> datafusion_common::Result<T, Error>;
124}
125
126impl<T, U> FromOptionalField<U> for Option<T>
127where
128 T: TryInto<U, Error = Error>,
129{
130 fn optional(self) -> datafusion_common::Result<Option<U>, Error> {
131 self.map(|t| t.try_into()).transpose()
132 }
133
134 fn required(self, field: impl Into<String>) -> datafusion_common::Result<U, Error> {
135 match self {
136 None => Err(Error::required(field)),
137 Some(t) => t.try_into(),
138 }
139 }
140}
141
142impl From<protobuf::Column> for Column {
143 fn from(c: protobuf::Column) -> Self {
144 let protobuf::Column { relation, name } = c;
145
146 Self::new(relation.map(|r| r.relation), name)
147 }
148}
149
150impl From<&protobuf::Column> for Column {
151 fn from(c: &protobuf::Column) -> Self {
152 c.clone().into()
153 }
154}
155
156impl TryFrom<&protobuf::DfSchema> for DFSchema {
157 type Error = Error;
158
159 fn try_from(
160 df_schema: &protobuf::DfSchema,
161 ) -> datafusion_common::Result<Self, Self::Error> {
162 let df_fields = df_schema.columns.clone();
163 let qualifiers_and_fields: Vec<(Option<TableReference>, Arc<Field>)> = df_fields
164 .iter()
165 .map(|df_field| {
166 let field: Field = df_field.field.as_ref().required("field")?;
167 Ok((
168 df_field
169 .qualifier
170 .as_ref()
171 .map(|q| q.relation.clone().into()),
172 Arc::new(field),
173 ))
174 })
175 .collect::<datafusion_common::Result<Vec<_>, Error>>()?;
176
177 Ok(DFSchema::new_with_metadata(
178 qualifiers_and_fields,
179 df_schema.metadata.clone(),
180 )?)
181 }
182}
183
184impl TryFrom<protobuf::DfSchema> for DFSchemaRef {
185 type Error = Error;
186
187 fn try_from(
188 df_schema: protobuf::DfSchema,
189 ) -> datafusion_common::Result<Self, Self::Error> {
190 let dfschema: DFSchema = (&df_schema).try_into()?;
191 Ok(Arc::new(dfschema))
192 }
193}
194
195impl TryFrom<&protobuf::ArrowType> for DataType {
196 type Error = Error;
197
198 fn try_from(
199 arrow_type: &protobuf::ArrowType,
200 ) -> datafusion_common::Result<Self, Self::Error> {
201 arrow_type
202 .arrow_type_enum
203 .as_ref()
204 .required("arrow_type_enum")
205 }
206}
207
208impl TryFrom<&protobuf::arrow_type::ArrowTypeEnum> for DataType {
209 type Error = Error;
210 fn try_from(
211 arrow_type_enum: &protobuf::arrow_type::ArrowTypeEnum,
212 ) -> datafusion_common::Result<Self, Self::Error> {
213 use protobuf::arrow_type;
214 Ok(match arrow_type_enum {
215 arrow_type::ArrowTypeEnum::None(_) => DataType::Null,
216 arrow_type::ArrowTypeEnum::Bool(_) => DataType::Boolean,
217 arrow_type::ArrowTypeEnum::Uint8(_) => DataType::UInt8,
218 arrow_type::ArrowTypeEnum::Int8(_) => DataType::Int8,
219 arrow_type::ArrowTypeEnum::Uint16(_) => DataType::UInt16,
220 arrow_type::ArrowTypeEnum::Int16(_) => DataType::Int16,
221 arrow_type::ArrowTypeEnum::Uint32(_) => DataType::UInt32,
222 arrow_type::ArrowTypeEnum::Int32(_) => DataType::Int32,
223 arrow_type::ArrowTypeEnum::Uint64(_) => DataType::UInt64,
224 arrow_type::ArrowTypeEnum::Int64(_) => DataType::Int64,
225 arrow_type::ArrowTypeEnum::Float16(_) => DataType::Float16,
226 arrow_type::ArrowTypeEnum::Float32(_) => DataType::Float32,
227 arrow_type::ArrowTypeEnum::Float64(_) => DataType::Float64,
228 arrow_type::ArrowTypeEnum::Utf8(_) => DataType::Utf8,
229 arrow_type::ArrowTypeEnum::Utf8View(_) => DataType::Utf8View,
230 arrow_type::ArrowTypeEnum::LargeUtf8(_) => DataType::LargeUtf8,
231 arrow_type::ArrowTypeEnum::Binary(_) => DataType::Binary,
232 arrow_type::ArrowTypeEnum::BinaryView(_) => DataType::BinaryView,
233 arrow_type::ArrowTypeEnum::FixedSizeBinary(size) => {
234 DataType::FixedSizeBinary(*size)
235 }
236 arrow_type::ArrowTypeEnum::LargeBinary(_) => DataType::LargeBinary,
237 arrow_type::ArrowTypeEnum::Date32(_) => DataType::Date32,
238 arrow_type::ArrowTypeEnum::Date64(_) => DataType::Date64,
239 arrow_type::ArrowTypeEnum::Duration(time_unit) => {
240 DataType::Duration(parse_i32_to_time_unit(time_unit)?)
241 }
242 arrow_type::ArrowTypeEnum::Timestamp(protobuf::Timestamp {
243 time_unit,
244 timezone,
245 }) => DataType::Timestamp(
246 parse_i32_to_time_unit(time_unit)?,
247 match timezone.len() {
248 0 => None,
249 _ => Some(timezone.as_str().into()),
250 },
251 ),
252 arrow_type::ArrowTypeEnum::Time32(time_unit) => {
253 DataType::Time32(parse_i32_to_time_unit(time_unit)?)
254 }
255 arrow_type::ArrowTypeEnum::Time64(time_unit) => {
256 DataType::Time64(parse_i32_to_time_unit(time_unit)?)
257 }
258 arrow_type::ArrowTypeEnum::Interval(interval_unit) => {
259 DataType::Interval(parse_i32_to_interval_unit(interval_unit)?)
260 }
261 arrow_type::ArrowTypeEnum::Decimal32(protobuf::Decimal32Type {
262 precision,
263 scale,
264 }) => DataType::Decimal32(*precision as u8, *scale as i8),
265 arrow_type::ArrowTypeEnum::Decimal64(protobuf::Decimal64Type {
266 precision,
267 scale,
268 }) => DataType::Decimal64(*precision as u8, *scale as i8),
269 arrow_type::ArrowTypeEnum::Decimal128(protobuf::Decimal128Type {
270 precision,
271 scale,
272 }) => DataType::Decimal128(*precision as u8, *scale as i8),
273 arrow_type::ArrowTypeEnum::Decimal256(protobuf::Decimal256Type {
274 precision,
275 scale,
276 }) => DataType::Decimal256(*precision as u8, *scale as i8),
277 arrow_type::ArrowTypeEnum::List(list) => {
278 let list_type =
279 list.as_ref().field_type.as_deref().required("field_type")?;
280 DataType::List(Arc::new(list_type))
281 }
282 arrow_type::ArrowTypeEnum::LargeList(list) => {
283 let list_type =
284 list.as_ref().field_type.as_deref().required("field_type")?;
285 DataType::LargeList(Arc::new(list_type))
286 }
287 arrow_type::ArrowTypeEnum::FixedSizeList(list) => {
288 let list_type =
289 list.as_ref().field_type.as_deref().required("field_type")?;
290 let list_size = list.list_size;
291 DataType::FixedSizeList(Arc::new(list_type), list_size)
292 }
293 arrow_type::ArrowTypeEnum::Struct(strct) => DataType::Struct(
294 parse_proto_fields_to_fields(&strct.sub_field_types)?.into(),
295 ),
296 arrow_type::ArrowTypeEnum::Union(union) => {
297 let union_mode = protobuf::UnionMode::try_from(union.union_mode)
298 .map_err(|_| Error::unknown("UnionMode", union.union_mode))?;
299 let union_mode = match union_mode {
300 protobuf::UnionMode::Dense => UnionMode::Dense,
301 protobuf::UnionMode::Sparse => UnionMode::Sparse,
302 };
303 let union_fields = parse_proto_fields_to_fields(&union.union_types)?;
304
305 let type_ids: Vec<_> = match union.type_ids.is_empty() {
307 true => (0..union_fields.len() as i8).collect(),
308 false => union.type_ids.iter().map(|i| *i as i8).collect(),
309 };
310
311 DataType::Union(UnionFields::new(type_ids, union_fields), union_mode)
312 }
313 arrow_type::ArrowTypeEnum::Dictionary(dict) => {
314 let key_datatype = dict.as_ref().key.as_deref().required("key")?;
315 let value_datatype = dict.as_ref().value.as_deref().required("value")?;
316 DataType::Dictionary(Box::new(key_datatype), Box::new(value_datatype))
317 }
318 arrow_type::ArrowTypeEnum::Map(map) => {
319 let field: Field =
320 map.as_ref().field_type.as_deref().required("field_type")?;
321 let keys_sorted = map.keys_sorted;
322 DataType::Map(Arc::new(field), keys_sorted)
323 }
324 })
325 }
326}
327
328impl TryFrom<&protobuf::Field> for Field {
329 type Error = Error;
330 fn try_from(field: &protobuf::Field) -> Result<Self, Self::Error> {
331 let datatype = field.arrow_type.as_deref().required("arrow_type")?;
332 let field = Self::new(field.name.as_str(), datatype, field.nullable)
333 .with_metadata(field.metadata.clone());
334 Ok(field)
335 }
336}
337
338impl TryFrom<&protobuf::Schema> for Schema {
339 type Error = Error;
340
341 fn try_from(
342 schema: &protobuf::Schema,
343 ) -> datafusion_common::Result<Self, Self::Error> {
344 let fields = schema
345 .columns
346 .iter()
347 .map(Field::try_from)
348 .collect::<datafusion_common::Result<Vec<_>, _>>()?;
349 Ok(Self::new_with_metadata(fields, schema.metadata.clone()))
350 }
351}
352
353impl TryFrom<&protobuf::ScalarValue> for ScalarValue {
354 type Error = Error;
355
356 fn try_from(
357 scalar: &protobuf::ScalarValue,
358 ) -> datafusion_common::Result<Self, Self::Error> {
359 use protobuf::scalar_value::Value;
360
361 let value = scalar
362 .value
363 .as_ref()
364 .ok_or_else(|| Error::required("value"))?;
365
366 Ok(match value {
367 Value::BoolValue(v) => Self::Boolean(Some(*v)),
368 Value::Utf8Value(v) => Self::Utf8(Some(v.to_owned())),
369 Value::Utf8ViewValue(v) => Self::Utf8View(Some(v.to_owned())),
370 Value::LargeUtf8Value(v) => Self::LargeUtf8(Some(v.to_owned())),
371 Value::Int8Value(v) => Self::Int8(Some(*v as i8)),
372 Value::Int16Value(v) => Self::Int16(Some(*v as i16)),
373 Value::Int32Value(v) => Self::Int32(Some(*v)),
374 Value::Int64Value(v) => Self::Int64(Some(*v)),
375 Value::Uint8Value(v) => Self::UInt8(Some(*v as u8)),
376 Value::Uint16Value(v) => Self::UInt16(Some(*v as u16)),
377 Value::Uint32Value(v) => Self::UInt32(Some(*v)),
378 Value::Uint64Value(v) => Self::UInt64(Some(*v)),
379 Value::Float32Value(v) => Self::Float32(Some(*v)),
380 Value::Float64Value(v) => Self::Float64(Some(*v)),
381 Value::Date32Value(v) => Self::Date32(Some(*v)),
382 Value::ListValue(v)
384 | Value::FixedSizeListValue(v)
385 | Value::LargeListValue(v)
386 | Value::StructValue(v)
387 | Value::MapValue(v) => {
388 let protobuf::ScalarNestedValue {
389 ipc_message,
390 arrow_data,
391 dictionaries,
392 schema,
393 } = &v;
394
395 let schema: Schema = if let Some(schema_ref) = schema {
396 schema_ref.try_into()?
397 } else {
398 return Err(Error::General(
399 "Invalid schema while deserializing ScalarValue::List"
400 .to_string(),
401 ));
402 };
403
404 let message = root_as_message(ipc_message.as_slice()).map_err(|e| {
405 Error::General(format!(
406 "Error IPC message while deserializing ScalarValue::List: {e}"
407 ))
408 })?;
409 let buffer = Buffer::from(arrow_data.as_slice());
410
411 let ipc_batch = message.header_as_record_batch().ok_or_else(|| {
412 Error::General(
413 "Unexpected message type deserializing ScalarValue::List"
414 .to_string(),
415 )
416 })?;
417
418 let dict_by_id: HashMap<i64,ArrayRef> = dictionaries.iter().map(|protobuf::scalar_nested_value::Dictionary { ipc_message, arrow_data }| {
419 let message = root_as_message(ipc_message.as_slice()).map_err(|e| {
420 Error::General(format!(
421 "Error IPC message while deserializing ScalarValue::List dictionary message: {e}"
422 ))
423 })?;
424 let buffer = Buffer::from(arrow_data.as_slice());
425
426 let dict_batch = message.header_as_dictionary_batch().ok_or_else(|| {
427 Error::General(
428 "Unexpected message type deserializing ScalarValue::List dictionary message"
429 .to_string(),
430 )
431 })?;
432
433 let id = dict_batch.id();
434
435 let record_batch = read_record_batch(
436 &buffer,
437 dict_batch.data().unwrap(),
438 Arc::new(schema.clone()),
439 &Default::default(),
440 None,
441 &message.version(),
442 )?;
443
444 let values: ArrayRef = Arc::clone(record_batch.column(0));
445
446 Ok((id, values))
447 }).collect::<datafusion_common::Result<HashMap<_, _>>>()?;
448
449 let record_batch = read_record_batch(
450 &buffer,
451 ipc_batch,
452 Arc::new(schema),
453 &dict_by_id,
454 None,
455 &message.version(),
456 )
457 .map_err(|e| arrow_datafusion_err!(e))
458 .map_err(|e| e.context("Decoding ScalarValue::List Value"))?;
459 let arr = record_batch.column(0);
460 match value {
461 Value::ListValue(_) => {
462 Self::List(arr.as_list::<i32>().to_owned().into())
463 }
464 Value::LargeListValue(_) => {
465 Self::LargeList(arr.as_list::<i64>().to_owned().into())
466 }
467 Value::FixedSizeListValue(_) => {
468 Self::FixedSizeList(arr.as_fixed_size_list().to_owned().into())
469 }
470 Value::StructValue(_) => {
471 Self::Struct(arr.as_struct().to_owned().into())
472 }
473 Value::MapValue(_) => Self::Map(arr.as_map().to_owned().into()),
474 _ => unreachable!(),
475 }
476 }
477 Value::NullValue(v) => {
478 let null_type: DataType = v.try_into()?;
479 null_type.try_into().map_err(Error::DataFusionError)?
480 }
481 Value::Decimal32Value(_val) => {
482 return not_impl_err!("Decimal32 protobuf deserialization")
483 .map_err(Error::DataFusionError)
484 }
485 Value::Decimal64Value(_val) => {
486 return not_impl_err!("Decimal64 protobuf deserialization")
487 .map_err(Error::DataFusionError)
488 }
489 Value::Decimal128Value(val) => {
490 let array = vec_to_array(val.value.clone());
491 Self::Decimal128(
492 Some(i128::from_be_bytes(array)),
493 val.p as u8,
494 val.s as i8,
495 )
496 }
497 Value::Decimal256Value(val) => {
498 let array = vec_to_array(val.value.clone());
499 Self::Decimal256(
500 Some(i256::from_be_bytes(array)),
501 val.p as u8,
502 val.s as i8,
503 )
504 }
505 Value::Date64Value(v) => Self::Date64(Some(*v)),
506 Value::Time32Value(v) => {
507 let time_value =
508 v.value.as_ref().ok_or_else(|| Error::required("value"))?;
509 match time_value {
510 protobuf::scalar_time32_value::Value::Time32SecondValue(t) => {
511 Self::Time32Second(Some(*t))
512 }
513 protobuf::scalar_time32_value::Value::Time32MillisecondValue(t) => {
514 Self::Time32Millisecond(Some(*t))
515 }
516 }
517 }
518 Value::Time64Value(v) => {
519 let time_value =
520 v.value.as_ref().ok_or_else(|| Error::required("value"))?;
521 match time_value {
522 protobuf::scalar_time64_value::Value::Time64MicrosecondValue(t) => {
523 Self::Time64Microsecond(Some(*t))
524 }
525 protobuf::scalar_time64_value::Value::Time64NanosecondValue(t) => {
526 Self::Time64Nanosecond(Some(*t))
527 }
528 }
529 }
530 Value::IntervalYearmonthValue(v) => Self::IntervalYearMonth(Some(*v)),
531 Value::DurationSecondValue(v) => Self::DurationSecond(Some(*v)),
532 Value::DurationMillisecondValue(v) => Self::DurationMillisecond(Some(*v)),
533 Value::DurationMicrosecondValue(v) => Self::DurationMicrosecond(Some(*v)),
534 Value::DurationNanosecondValue(v) => Self::DurationNanosecond(Some(*v)),
535 Value::TimestampValue(v) => {
536 let timezone = if v.timezone.is_empty() {
537 None
538 } else {
539 Some(v.timezone.as_str().into())
540 };
541
542 let ts_value =
543 v.value.as_ref().ok_or_else(|| Error::required("value"))?;
544
545 match ts_value {
546 protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(t) => {
547 Self::TimestampMicrosecond(Some(*t), timezone)
548 }
549 protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(t) => {
550 Self::TimestampNanosecond(Some(*t), timezone)
551 }
552 protobuf::scalar_timestamp_value::Value::TimeSecondValue(t) => {
553 Self::TimestampSecond(Some(*t), timezone)
554 }
555 protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(t) => {
556 Self::TimestampMillisecond(Some(*t), timezone)
557 }
558 }
559 }
560 Value::DictionaryValue(v) => {
561 let index_type: DataType = v
562 .index_type
563 .as_ref()
564 .ok_or_else(|| Error::required("index_type"))?
565 .try_into()?;
566
567 let value: Self = v
568 .value
569 .as_ref()
570 .ok_or_else(|| Error::required("value"))?
571 .as_ref()
572 .try_into()?;
573
574 Self::Dictionary(Box::new(index_type), Box::new(value))
575 }
576 Value::BinaryValue(v) => Self::Binary(Some(v.clone())),
577 Value::BinaryViewValue(v) => Self::BinaryView(Some(v.clone())),
578 Value::LargeBinaryValue(v) => Self::LargeBinary(Some(v.clone())),
579 Value::IntervalDaytimeValue(v) => Self::IntervalDayTime(Some(
580 IntervalDayTimeType::make_value(v.days, v.milliseconds),
581 )),
582 Value::IntervalMonthDayNano(v) => Self::IntervalMonthDayNano(Some(
583 IntervalMonthDayNanoType::make_value(v.months, v.days, v.nanos),
584 )),
585 Value::UnionValue(val) => {
586 let mode = match val.mode {
587 0 => UnionMode::Sparse,
588 1 => UnionMode::Dense,
589 id => Err(Error::unknown("UnionMode", id))?,
590 };
591 let ids = val
592 .fields
593 .iter()
594 .map(|f| f.field_id as i8)
595 .collect::<Vec<_>>();
596 let fields = val
597 .fields
598 .iter()
599 .map(|f| f.field.clone())
600 .collect::<Option<Vec<_>>>();
601 let fields = fields.ok_or_else(|| Error::required("UnionField"))?;
602 let fields = parse_proto_fields_to_fields(&fields)?;
603 let fields = UnionFields::new(ids, fields);
604 let v_id = val.value_id as i8;
605 let val = match &val.value {
606 None => None,
607 Some(val) => {
608 let val: ScalarValue = val
609 .as_ref()
610 .try_into()
611 .map_err(|_| Error::General("Invalid Scalar".to_string()))?;
612 Some((v_id, Box::new(val)))
613 }
614 };
615 Self::Union(val, fields, mode)
616 }
617 Value::FixedSizeBinaryValue(v) => {
618 Self::FixedSizeBinary(v.length, Some(v.clone().values))
619 }
620 })
621 }
622}
623
624impl From<protobuf::TimeUnit> for TimeUnit {
625 fn from(time_unit: protobuf::TimeUnit) -> Self {
626 match time_unit {
627 protobuf::TimeUnit::Second => TimeUnit::Second,
628 protobuf::TimeUnit::Millisecond => TimeUnit::Millisecond,
629 protobuf::TimeUnit::Microsecond => TimeUnit::Microsecond,
630 protobuf::TimeUnit::Nanosecond => TimeUnit::Nanosecond,
631 }
632 }
633}
634
635impl From<protobuf::IntervalUnit> for IntervalUnit {
636 fn from(interval_unit: protobuf::IntervalUnit) -> Self {
637 match interval_unit {
638 protobuf::IntervalUnit::YearMonth => IntervalUnit::YearMonth,
639 protobuf::IntervalUnit::DayTime => IntervalUnit::DayTime,
640 protobuf::IntervalUnit::MonthDayNano => IntervalUnit::MonthDayNano,
641 }
642 }
643}
644
645impl From<protobuf::Constraints> for Constraints {
646 fn from(constraints: protobuf::Constraints) -> Self {
647 Constraints::new_unverified(
648 constraints
649 .constraints
650 .into_iter()
651 .map(|item| item.into())
652 .collect(),
653 )
654 }
655}
656
657impl From<protobuf::Constraint> for Constraint {
658 fn from(value: protobuf::Constraint) -> Self {
659 match value.constraint_mode.unwrap() {
660 protobuf::constraint::ConstraintMode::PrimaryKey(elem) => {
661 Constraint::PrimaryKey(
662 elem.indices.into_iter().map(|item| item as usize).collect(),
663 )
664 }
665 protobuf::constraint::ConstraintMode::Unique(elem) => Constraint::Unique(
666 elem.indices.into_iter().map(|item| item as usize).collect(),
667 ),
668 }
669 }
670}
671
672impl From<&protobuf::ColumnStats> for ColumnStatistics {
673 fn from(cs: &protobuf::ColumnStats) -> ColumnStatistics {
674 ColumnStatistics {
675 null_count: if let Some(nc) = &cs.null_count {
676 nc.clone().into()
677 } else {
678 Precision::Absent
679 },
680 max_value: if let Some(max) = &cs.max_value {
681 max.clone().into()
682 } else {
683 Precision::Absent
684 },
685 min_value: if let Some(min) = &cs.min_value {
686 min.clone().into()
687 } else {
688 Precision::Absent
689 },
690 sum_value: if let Some(sum) = &cs.sum_value {
691 sum.clone().into()
692 } else {
693 Precision::Absent
694 },
695 distinct_count: if let Some(dc) = &cs.distinct_count {
696 dc.clone().into()
697 } else {
698 Precision::Absent
699 },
700 }
701 }
702}
703
704impl From<protobuf::Precision> for Precision<usize> {
705 fn from(s: protobuf::Precision) -> Self {
706 let Ok(precision_type) = s.precision_info.try_into() else {
707 return Precision::Absent;
708 };
709 match precision_type {
710 protobuf::PrecisionInfo::Exact => {
711 if let Some(val) = s.val {
712 if let Ok(ScalarValue::UInt64(Some(val))) =
713 ScalarValue::try_from(&val)
714 {
715 Precision::Exact(val as usize)
716 } else {
717 Precision::Absent
718 }
719 } else {
720 Precision::Absent
721 }
722 }
723 protobuf::PrecisionInfo::Inexact => {
724 if let Some(val) = s.val {
725 if let Ok(ScalarValue::UInt64(Some(val))) =
726 ScalarValue::try_from(&val)
727 {
728 Precision::Inexact(val as usize)
729 } else {
730 Precision::Absent
731 }
732 } else {
733 Precision::Absent
734 }
735 }
736 protobuf::PrecisionInfo::Absent => Precision::Absent,
737 }
738 }
739}
740
741impl From<protobuf::Precision> for Precision<ScalarValue> {
742 fn from(s: protobuf::Precision) -> Self {
743 let Ok(precision_type) = s.precision_info.try_into() else {
744 return Precision::Absent;
745 };
746 match precision_type {
747 protobuf::PrecisionInfo::Exact => {
748 if let Some(val) = s.val {
749 if let Ok(val) = ScalarValue::try_from(&val) {
750 Precision::Exact(val)
751 } else {
752 Precision::Absent
753 }
754 } else {
755 Precision::Absent
756 }
757 }
758 protobuf::PrecisionInfo::Inexact => {
759 if let Some(val) = s.val {
760 if let Ok(val) = ScalarValue::try_from(&val) {
761 Precision::Inexact(val)
762 } else {
763 Precision::Absent
764 }
765 } else {
766 Precision::Absent
767 }
768 }
769 protobuf::PrecisionInfo::Absent => Precision::Absent,
770 }
771 }
772}
773
774impl From<protobuf::JoinSide> for JoinSide {
775 fn from(t: protobuf::JoinSide) -> Self {
776 match t {
777 protobuf::JoinSide::LeftSide => JoinSide::Left,
778 protobuf::JoinSide::RightSide => JoinSide::Right,
779 protobuf::JoinSide::None => JoinSide::None,
780 }
781 }
782}
783
784impl From<&protobuf::Constraint> for Constraint {
785 fn from(value: &protobuf::Constraint) -> Self {
786 match &value.constraint_mode {
787 Some(protobuf::constraint::ConstraintMode::PrimaryKey(elem)) => {
788 Constraint::PrimaryKey(
789 elem.indices.iter().map(|&item| item as usize).collect(),
790 )
791 }
792 Some(protobuf::constraint::ConstraintMode::Unique(elem)) => {
793 Constraint::Unique(
794 elem.indices.iter().map(|&item| item as usize).collect(),
795 )
796 }
797 None => panic!("constraint_mode not set"),
798 }
799 }
800}
801
802impl TryFrom<&protobuf::Constraints> for Constraints {
803 type Error = DataFusionError;
804
805 fn try_from(
806 constraints: &protobuf::Constraints,
807 ) -> datafusion_common::Result<Self, Self::Error> {
808 Ok(Constraints::new_unverified(
809 constraints
810 .constraints
811 .iter()
812 .map(|item| item.into())
813 .collect(),
814 ))
815 }
816}
817
818impl TryFrom<&protobuf::Statistics> for Statistics {
819 type Error = DataFusionError;
820
821 fn try_from(
822 s: &protobuf::Statistics,
823 ) -> datafusion_common::Result<Self, Self::Error> {
824 Ok(Statistics {
826 num_rows: if let Some(nr) = &s.num_rows {
827 nr.clone().into()
828 } else {
829 Precision::Absent
830 },
831 total_byte_size: if let Some(tbs) = &s.total_byte_size {
832 tbs.clone().into()
833 } else {
834 Precision::Absent
835 },
836 column_statistics: s.column_stats.iter().map(|s| s.into()).collect(),
838 })
839 }
840}
841
842impl From<protobuf::CompressionTypeVariant> for CompressionTypeVariant {
843 fn from(value: protobuf::CompressionTypeVariant) -> Self {
844 match value {
845 protobuf::CompressionTypeVariant::Gzip => Self::GZIP,
846 protobuf::CompressionTypeVariant::Bzip2 => Self::BZIP2,
847 protobuf::CompressionTypeVariant::Xz => Self::XZ,
848 protobuf::CompressionTypeVariant::Zstd => Self::ZSTD,
849 protobuf::CompressionTypeVariant::Uncompressed => Self::UNCOMPRESSED,
850 }
851 }
852}
853
854impl From<CompressionTypeVariant> for protobuf::CompressionTypeVariant {
855 fn from(value: CompressionTypeVariant) -> Self {
856 match value {
857 CompressionTypeVariant::GZIP => Self::Gzip,
858 CompressionTypeVariant::BZIP2 => Self::Bzip2,
859 CompressionTypeVariant::XZ => Self::Xz,
860 CompressionTypeVariant::ZSTD => Self::Zstd,
861 CompressionTypeVariant::UNCOMPRESSED => Self::Uncompressed,
862 }
863 }
864}
865
866impl TryFrom<&protobuf::CsvWriterOptions> for CsvWriterOptions {
867 type Error = DataFusionError;
868
869 fn try_from(
870 opts: &protobuf::CsvWriterOptions,
871 ) -> datafusion_common::Result<Self, Self::Error> {
872 let write_options = csv_writer_options_from_proto(opts)?;
873 let compression: CompressionTypeVariant = opts.compression().into();
874 Ok(CsvWriterOptions::new(write_options, compression))
875 }
876}
877
878impl TryFrom<&protobuf::JsonWriterOptions> for JsonWriterOptions {
879 type Error = DataFusionError;
880
881 fn try_from(
882 opts: &protobuf::JsonWriterOptions,
883 ) -> datafusion_common::Result<Self, Self::Error> {
884 let compression: CompressionTypeVariant = opts.compression().into();
885 Ok(JsonWriterOptions::new(compression))
886 }
887}
888
889impl TryFrom<&protobuf::CsvOptions> for CsvOptions {
890 type Error = DataFusionError;
891
892 fn try_from(
893 proto_opts: &protobuf::CsvOptions,
894 ) -> datafusion_common::Result<Self, Self::Error> {
895 Ok(CsvOptions {
896 has_header: proto_opts.has_header.first().map(|h| *h != 0),
897 delimiter: proto_opts.delimiter[0],
898 quote: proto_opts.quote[0],
899 terminator: proto_opts.terminator.first().copied(),
900 escape: proto_opts.escape.first().copied(),
901 double_quote: proto_opts.has_header.first().map(|h| *h != 0),
902 newlines_in_values: proto_opts.newlines_in_values.first().map(|h| *h != 0),
903 compression: proto_opts.compression().into(),
904 schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize),
905 date_format: (!proto_opts.date_format.is_empty())
906 .then(|| proto_opts.date_format.clone()),
907 datetime_format: (!proto_opts.datetime_format.is_empty())
908 .then(|| proto_opts.datetime_format.clone()),
909 timestamp_format: (!proto_opts.timestamp_format.is_empty())
910 .then(|| proto_opts.timestamp_format.clone()),
911 timestamp_tz_format: (!proto_opts.timestamp_tz_format.is_empty())
912 .then(|| proto_opts.timestamp_tz_format.clone()),
913 time_format: (!proto_opts.time_format.is_empty())
914 .then(|| proto_opts.time_format.clone()),
915 null_value: (!proto_opts.null_value.is_empty())
916 .then(|| proto_opts.null_value.clone()),
917 null_regex: (!proto_opts.null_regex.is_empty())
918 .then(|| proto_opts.null_regex.clone()),
919 comment: proto_opts.comment.first().copied(),
920 })
921 }
922}
923
924impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
925 type Error = DataFusionError;
926
927 fn try_from(
928 value: &protobuf::ParquetOptions,
929 ) -> datafusion_common::Result<Self, Self::Error> {
930 #[allow(deprecated)] Ok(ParquetOptions {
932 enable_page_index: value.enable_page_index,
933 pruning: value.pruning,
934 skip_metadata: value.skip_metadata,
935 metadata_size_hint: value
936 .metadata_size_hint_opt
937 .map(|opt| match opt {
938 protobuf::parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v) => Some(v as usize),
939 })
940 .unwrap_or(None),
941 pushdown_filters: value.pushdown_filters,
942 reorder_filters: value.reorder_filters,
943 data_pagesize_limit: value.data_pagesize_limit as usize,
944 write_batch_size: value.write_batch_size as usize,
945 writer_version: value.writer_version.clone(),
946 compression: value.compression_opt.clone().map(|opt| match opt {
947 protobuf::parquet_options::CompressionOpt::Compression(v) => Some(v),
948 }).unwrap_or(None),
949 dictionary_enabled: value.dictionary_enabled_opt.as_ref().map(|protobuf::parquet_options::DictionaryEnabledOpt::DictionaryEnabled(v)| *v),
950 dictionary_page_size_limit: value.dictionary_page_size_limit as usize,
952 statistics_enabled: value
953 .statistics_enabled_opt.clone()
954 .map(|opt| match opt {
955 protobuf::parquet_options::StatisticsEnabledOpt::StatisticsEnabled(v) => Some(v),
956 })
957 .unwrap_or(None),
958 max_row_group_size: value.max_row_group_size as usize,
959 created_by: value.created_by.clone(),
960 column_index_truncate_length: value
961 .column_index_truncate_length_opt.as_ref()
962 .map(|opt| match opt {
963 protobuf::parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v) => Some(*v as usize),
964 })
965 .unwrap_or(None),
966 statistics_truncate_length: value
967 .statistics_truncate_length_opt.as_ref()
968 .map(|opt| match opt {
969 protobuf::parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v) => Some(*v as usize),
970 })
971 .unwrap_or(None),
972 data_page_row_count_limit: value.data_page_row_count_limit as usize,
973 encoding: value
974 .encoding_opt.clone()
975 .map(|opt| match opt {
976 protobuf::parquet_options::EncodingOpt::Encoding(v) => Some(v),
977 })
978 .unwrap_or(None),
979 bloom_filter_on_read: value.bloom_filter_on_read,
980 bloom_filter_on_write: value.bloom_filter_on_write,
981 bloom_filter_fpp: value.clone()
982 .bloom_filter_fpp_opt
983 .map(|opt| match opt {
984 protobuf::parquet_options::BloomFilterFppOpt::BloomFilterFpp(v) => Some(v),
985 })
986 .unwrap_or(None),
987 bloom_filter_ndv: value.clone()
988 .bloom_filter_ndv_opt
989 .map(|opt| match opt {
990 protobuf::parquet_options::BloomFilterNdvOpt::BloomFilterNdv(v) => Some(v),
991 })
992 .unwrap_or(None),
993 allow_single_file_parallelism: value.allow_single_file_parallelism,
994 maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as usize,
995 maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as usize,
996 schema_force_view_types: value.schema_force_view_types,
997 binary_as_string: value.binary_as_string,
998 coerce_int96: value.coerce_int96_opt.clone().map(|opt| match opt {
999 protobuf::parquet_options::CoerceInt96Opt::CoerceInt96(v) => Some(v),
1000 }).unwrap_or(None),
1001 skip_arrow_metadata: value.skip_arrow_metadata,
1002 })
1003 }
1004}
1005
1006impl TryFrom<&protobuf::ParquetColumnOptions> for ParquetColumnOptions {
1007 type Error = DataFusionError;
1008 fn try_from(
1009 value: &protobuf::ParquetColumnOptions,
1010 ) -> datafusion_common::Result<Self, Self::Error> {
1011 #[allow(deprecated)] Ok(ParquetColumnOptions {
1013 compression: value.compression_opt.clone().map(|opt| match opt {
1014 protobuf::parquet_column_options::CompressionOpt::Compression(v) => Some(v),
1015 }).unwrap_or(None),
1016 dictionary_enabled: value.dictionary_enabled_opt.as_ref().map(|protobuf::parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(v)| *v),
1017 statistics_enabled: value
1018 .statistics_enabled_opt.clone()
1019 .map(|opt| match opt {
1020 protobuf::parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(v) => Some(v),
1021 })
1022 .unwrap_or(None),
1023 encoding: value
1024 .encoding_opt.clone()
1025 .map(|opt| match opt {
1026 protobuf::parquet_column_options::EncodingOpt::Encoding(v) => Some(v),
1027 })
1028 .unwrap_or(None),
1029 bloom_filter_enabled: value.bloom_filter_enabled_opt.map(|opt| match opt {
1030 protobuf::parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v) => Some(v),
1031 })
1032 .unwrap_or(None),
1033 bloom_filter_fpp: value
1034 .bloom_filter_fpp_opt
1035 .map(|opt| match opt {
1036 protobuf::parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v) => Some(v),
1037 })
1038 .unwrap_or(None),
1039 bloom_filter_ndv: value
1040 .bloom_filter_ndv_opt
1041 .map(|opt| match opt {
1042 protobuf::parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v) => Some(v),
1043 })
1044 .unwrap_or(None),
1045 })
1046 }
1047}
1048
1049impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions {
1050 type Error = DataFusionError;
1051 fn try_from(
1052 value: &protobuf::TableParquetOptions,
1053 ) -> datafusion_common::Result<Self, Self::Error> {
1054 let mut column_specific_options: HashMap<String, ParquetColumnOptions> =
1055 HashMap::new();
1056 for protobuf::ParquetColumnSpecificOptions {
1057 column_name,
1058 options: maybe_options,
1059 } in &value.column_specific_options
1060 {
1061 if let Some(options) = maybe_options {
1062 column_specific_options.insert(column_name.clone(), options.try_into()?);
1063 }
1064 }
1065 Ok(TableParquetOptions {
1066 global: value
1067 .global
1068 .as_ref()
1069 .map(|v| v.try_into())
1070 .unwrap()
1071 .unwrap(),
1072 column_specific_options,
1073 key_value_metadata: Default::default(),
1074 crypto: Default::default(),
1075 })
1076 }
1077}
1078
1079impl TryFrom<&protobuf::JsonOptions> for JsonOptions {
1080 type Error = DataFusionError;
1081
1082 fn try_from(
1083 proto_opts: &protobuf::JsonOptions,
1084 ) -> datafusion_common::Result<Self, Self::Error> {
1085 let compression: protobuf::CompressionTypeVariant = proto_opts.compression();
1086 Ok(JsonOptions {
1087 compression: compression.into(),
1088 schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize),
1089 })
1090 }
1091}
1092
1093pub fn parse_i32_to_time_unit(value: &i32) -> datafusion_common::Result<TimeUnit, Error> {
1094 protobuf::TimeUnit::try_from(*value)
1095 .map(|t| t.into())
1096 .map_err(|_| Error::unknown("TimeUnit", *value))
1097}
1098
1099pub fn parse_i32_to_interval_unit(
1100 value: &i32,
1101) -> datafusion_common::Result<IntervalUnit, Error> {
1102 protobuf::IntervalUnit::try_from(*value)
1103 .map(|t| t.into())
1104 .map_err(|_| Error::unknown("IntervalUnit", *value))
1105}
1106
1107fn vec_to_array<T, const N: usize>(v: Vec<T>) -> [T; N] {
1109 v.try_into().unwrap_or_else(|v: Vec<T>| {
1110 panic!("Expected a Vec of length {} but it was {}", N, v.len())
1111 })
1112}
1113
1114pub fn parse_proto_fields_to_fields<'a, I>(
1116 fields: I,
1117) -> std::result::Result<Vec<Field>, Error>
1118where
1119 I: IntoIterator<Item = &'a protobuf::Field>,
1120{
1121 fields
1122 .into_iter()
1123 .map(Field::try_from)
1124 .collect::<datafusion_common::Result<_, _>>()
1125}
1126
1127pub(crate) fn csv_writer_options_from_proto(
1128 writer_options: &protobuf::CsvWriterOptions,
1129) -> datafusion_common::Result<WriterBuilder> {
1130 let mut builder = WriterBuilder::new();
1131 if !writer_options.delimiter.is_empty() {
1132 if let Some(delimiter) = writer_options.delimiter.chars().next() {
1133 if delimiter.is_ascii() {
1134 builder = builder.with_delimiter(delimiter as u8);
1135 } else {
1136 return Err(proto_error("CSV Delimiter is not ASCII"));
1137 }
1138 } else {
1139 return Err(proto_error("Error parsing CSV Delimiter"));
1140 }
1141 }
1142 if !writer_options.quote.is_empty() {
1143 if let Some(quote) = writer_options.quote.chars().next() {
1144 if quote.is_ascii() {
1145 builder = builder.with_quote(quote as u8);
1146 } else {
1147 return Err(proto_error("CSV Quote is not ASCII"));
1148 }
1149 } else {
1150 return Err(proto_error("Error parsing CSV Quote"));
1151 }
1152 }
1153 if !writer_options.escape.is_empty() {
1154 if let Some(escape) = writer_options.escape.chars().next() {
1155 if escape.is_ascii() {
1156 builder = builder.with_escape(escape as u8);
1157 } else {
1158 return Err(proto_error("CSV Escape is not ASCII"));
1159 }
1160 } else {
1161 return Err(proto_error("Error parsing CSV Escape"));
1162 }
1163 }
1164 Ok(builder
1165 .with_header(writer_options.has_header)
1166 .with_date_format(writer_options.date_format.clone())
1167 .with_datetime_format(writer_options.datetime_format.clone())
1168 .with_timestamp_format(writer_options.timestamp_format.clone())
1169 .with_time_format(writer_options.time_format.clone())
1170 .with_null(writer_options.null_value.clone())
1171 .with_double_quote(writer_options.double_quote))
1172}