1use std::collections::HashMap;
19use std::convert::{TryFrom, TryInto};
20use std::sync::Arc;
21
22use crate::common::proto_error;
23use crate::protobuf_common as protobuf;
24use arrow::array::{ArrayRef, AsArray};
25use arrow::buffer::Buffer;
26use arrow::csv::WriterBuilder;
27use arrow::datatypes::{
28 i256, DataType, Field, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit,
29 Schema, TimeUnit, UnionFields, UnionMode,
30};
31use arrow::ipc::{reader::read_record_batch, root_as_message};
32
33use datafusion_common::{
34 arrow_datafusion_err,
35 config::{
36 CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
37 TableParquetOptions,
38 },
39 file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
40 parsers::CompressionTypeVariant,
41 plan_datafusion_err,
42 stats::Precision,
43 Column, ColumnStatistics, Constraint, Constraints, DFSchema, DFSchemaRef,
44 DataFusionError, JoinSide, ScalarValue, Statistics, TableReference,
45};
46
47#[derive(Debug)]
48pub enum Error {
49 General(String),
50
51 DataFusionError(DataFusionError),
52
53 MissingRequiredField(String),
54
55 AtLeastOneValue(String),
56
57 UnknownEnumVariant { name: String, value: i32 },
58}
59
60impl std::fmt::Display for Error {
61 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
62 match self {
63 Self::General(desc) => write!(f, "General error: {desc}"),
64
65 Self::DataFusionError(desc) => {
66 write!(f, "DataFusion error: {desc:?}")
67 }
68
69 Self::MissingRequiredField(name) => {
70 write!(f, "Missing required field {name}")
71 }
72 Self::AtLeastOneValue(name) => {
73 write!(f, "Must have at least one {name}, found 0")
74 }
75 Self::UnknownEnumVariant { name, value } => {
76 write!(f, "Unknown i32 value for {name} enum: {value}")
77 }
78 }
79 }
80}
81
82impl std::error::Error for Error {}
83
84impl From<DataFusionError> for Error {
85 fn from(e: DataFusionError) -> Self {
86 Error::DataFusionError(e)
87 }
88}
89
90impl Error {
91 pub fn required(field: impl Into<String>) -> Error {
92 Error::MissingRequiredField(field.into())
93 }
94
95 pub fn unknown(name: impl Into<String>, value: i32) -> Error {
96 Error::UnknownEnumVariant {
97 name: name.into(),
98 value,
99 }
100 }
101}
102
103impl From<Error> for DataFusionError {
104 fn from(e: Error) -> Self {
105 plan_datafusion_err!("{}", e)
106 }
107}
108
109pub trait FromOptionalField<T> {
112 fn optional(self) -> datafusion_common::Result<Option<T>, Error>;
117
118 fn required(self, field: impl Into<String>) -> datafusion_common::Result<T, Error>;
123}
124
125impl<T, U> FromOptionalField<U> for Option<T>
126where
127 T: TryInto<U, Error = Error>,
128{
129 fn optional(self) -> datafusion_common::Result<Option<U>, Error> {
130 self.map(|t| t.try_into()).transpose()
131 }
132
133 fn required(self, field: impl Into<String>) -> datafusion_common::Result<U, Error> {
134 match self {
135 None => Err(Error::required(field)),
136 Some(t) => t.try_into(),
137 }
138 }
139}
140
141impl From<protobuf::ColumnRelation> for TableReference {
142 fn from(rel: protobuf::ColumnRelation) -> Self {
143 Self::parse_str_normalized(rel.relation.as_str(), true)
144 }
145}
146
147impl From<protobuf::Column> for Column {
148 fn from(c: protobuf::Column) -> Self {
149 let protobuf::Column { relation, name } = c;
150
151 Self::new(relation, name)
152 }
153}
154
155impl From<&protobuf::Column> for Column {
156 fn from(c: &protobuf::Column) -> Self {
157 c.clone().into()
158 }
159}
160
161impl TryFrom<&protobuf::DfSchema> for DFSchema {
162 type Error = Error;
163
164 fn try_from(
165 df_schema: &protobuf::DfSchema,
166 ) -> datafusion_common::Result<Self, Self::Error> {
167 let df_fields = df_schema.columns.clone();
168 let qualifiers_and_fields: Vec<(Option<TableReference>, Arc<Field>)> = df_fields
169 .iter()
170 .map(|df_field| {
171 let field: Field = df_field.field.as_ref().required("field")?;
172 Ok((
173 df_field.qualifier.as_ref().map(|q| q.clone().into()),
174 Arc::new(field),
175 ))
176 })
177 .collect::<datafusion_common::Result<Vec<_>, Error>>()?;
178
179 Ok(DFSchema::new_with_metadata(
180 qualifiers_and_fields,
181 df_schema.metadata.clone(),
182 )?)
183 }
184}
185
186impl TryFrom<protobuf::DfSchema> for DFSchemaRef {
187 type Error = Error;
188
189 fn try_from(
190 df_schema: protobuf::DfSchema,
191 ) -> datafusion_common::Result<Self, Self::Error> {
192 let dfschema: DFSchema = (&df_schema).try_into()?;
193 Ok(Arc::new(dfschema))
194 }
195}
196
197impl TryFrom<&protobuf::ArrowType> for DataType {
198 type Error = Error;
199
200 fn try_from(
201 arrow_type: &protobuf::ArrowType,
202 ) -> datafusion_common::Result<Self, Self::Error> {
203 arrow_type
204 .arrow_type_enum
205 .as_ref()
206 .required("arrow_type_enum")
207 }
208}
209
210impl TryFrom<&protobuf::arrow_type::ArrowTypeEnum> for DataType {
211 type Error = Error;
212 fn try_from(
213 arrow_type_enum: &protobuf::arrow_type::ArrowTypeEnum,
214 ) -> datafusion_common::Result<Self, Self::Error> {
215 use protobuf::arrow_type;
216 Ok(match arrow_type_enum {
217 arrow_type::ArrowTypeEnum::None(_) => DataType::Null,
218 arrow_type::ArrowTypeEnum::Bool(_) => DataType::Boolean,
219 arrow_type::ArrowTypeEnum::Uint8(_) => DataType::UInt8,
220 arrow_type::ArrowTypeEnum::Int8(_) => DataType::Int8,
221 arrow_type::ArrowTypeEnum::Uint16(_) => DataType::UInt16,
222 arrow_type::ArrowTypeEnum::Int16(_) => DataType::Int16,
223 arrow_type::ArrowTypeEnum::Uint32(_) => DataType::UInt32,
224 arrow_type::ArrowTypeEnum::Int32(_) => DataType::Int32,
225 arrow_type::ArrowTypeEnum::Uint64(_) => DataType::UInt64,
226 arrow_type::ArrowTypeEnum::Int64(_) => DataType::Int64,
227 arrow_type::ArrowTypeEnum::Float16(_) => DataType::Float16,
228 arrow_type::ArrowTypeEnum::Float32(_) => DataType::Float32,
229 arrow_type::ArrowTypeEnum::Float64(_) => DataType::Float64,
230 arrow_type::ArrowTypeEnum::Utf8(_) => DataType::Utf8,
231 arrow_type::ArrowTypeEnum::Utf8View(_) => DataType::Utf8View,
232 arrow_type::ArrowTypeEnum::LargeUtf8(_) => DataType::LargeUtf8,
233 arrow_type::ArrowTypeEnum::Binary(_) => DataType::Binary,
234 arrow_type::ArrowTypeEnum::BinaryView(_) => DataType::BinaryView,
235 arrow_type::ArrowTypeEnum::FixedSizeBinary(size) => {
236 DataType::FixedSizeBinary(*size)
237 }
238 arrow_type::ArrowTypeEnum::LargeBinary(_) => DataType::LargeBinary,
239 arrow_type::ArrowTypeEnum::Date32(_) => DataType::Date32,
240 arrow_type::ArrowTypeEnum::Date64(_) => DataType::Date64,
241 arrow_type::ArrowTypeEnum::Duration(time_unit) => {
242 DataType::Duration(parse_i32_to_time_unit(time_unit)?)
243 }
244 arrow_type::ArrowTypeEnum::Timestamp(protobuf::Timestamp {
245 time_unit,
246 timezone,
247 }) => DataType::Timestamp(
248 parse_i32_to_time_unit(time_unit)?,
249 match timezone.len() {
250 0 => None,
251 _ => Some(timezone.as_str().into()),
252 },
253 ),
254 arrow_type::ArrowTypeEnum::Time32(time_unit) => {
255 DataType::Time32(parse_i32_to_time_unit(time_unit)?)
256 }
257 arrow_type::ArrowTypeEnum::Time64(time_unit) => {
258 DataType::Time64(parse_i32_to_time_unit(time_unit)?)
259 }
260 arrow_type::ArrowTypeEnum::Interval(interval_unit) => {
261 DataType::Interval(parse_i32_to_interval_unit(interval_unit)?)
262 }
263 arrow_type::ArrowTypeEnum::Decimal32(protobuf::Decimal32Type {
264 precision,
265 scale,
266 }) => DataType::Decimal32(*precision as u8, *scale as i8),
267 arrow_type::ArrowTypeEnum::Decimal64(protobuf::Decimal64Type {
268 precision,
269 scale,
270 }) => DataType::Decimal64(*precision as u8, *scale as i8),
271 arrow_type::ArrowTypeEnum::Decimal128(protobuf::Decimal128Type {
272 precision,
273 scale,
274 }) => DataType::Decimal128(*precision as u8, *scale as i8),
275 arrow_type::ArrowTypeEnum::Decimal256(protobuf::Decimal256Type {
276 precision,
277 scale,
278 }) => DataType::Decimal256(*precision as u8, *scale as i8),
279 arrow_type::ArrowTypeEnum::List(list) => {
280 let list_type =
281 list.as_ref().field_type.as_deref().required("field_type")?;
282 DataType::List(Arc::new(list_type))
283 }
284 arrow_type::ArrowTypeEnum::LargeList(list) => {
285 let list_type =
286 list.as_ref().field_type.as_deref().required("field_type")?;
287 DataType::LargeList(Arc::new(list_type))
288 }
289 arrow_type::ArrowTypeEnum::FixedSizeList(list) => {
290 let list_type =
291 list.as_ref().field_type.as_deref().required("field_type")?;
292 let list_size = list.list_size;
293 DataType::FixedSizeList(Arc::new(list_type), list_size)
294 }
295 arrow_type::ArrowTypeEnum::Struct(strct) => DataType::Struct(
296 parse_proto_fields_to_fields(&strct.sub_field_types)?.into(),
297 ),
298 arrow_type::ArrowTypeEnum::Union(union) => {
299 let union_mode = protobuf::UnionMode::try_from(union.union_mode)
300 .map_err(|_| Error::unknown("UnionMode", union.union_mode))?;
301 let union_mode = match union_mode {
302 protobuf::UnionMode::Dense => UnionMode::Dense,
303 protobuf::UnionMode::Sparse => UnionMode::Sparse,
304 };
305 let union_fields = parse_proto_fields_to_fields(&union.union_types)?;
306
307 let type_ids: Vec<_> = match union.type_ids.is_empty() {
309 true => (0..union_fields.len() as i8).collect(),
310 false => union.type_ids.iter().map(|i| *i as i8).collect(),
311 };
312
313 DataType::Union(UnionFields::new(type_ids, union_fields), union_mode)
314 }
315 arrow_type::ArrowTypeEnum::Dictionary(dict) => {
316 let key_datatype = dict.as_ref().key.as_deref().required("key")?;
317 let value_datatype = dict.as_ref().value.as_deref().required("value")?;
318 DataType::Dictionary(Box::new(key_datatype), Box::new(value_datatype))
319 }
320 arrow_type::ArrowTypeEnum::Map(map) => {
321 let field: Field =
322 map.as_ref().field_type.as_deref().required("field_type")?;
323 let keys_sorted = map.keys_sorted;
324 DataType::Map(Arc::new(field), keys_sorted)
325 }
326 })
327 }
328}
329
330impl TryFrom<&protobuf::Field> for Field {
331 type Error = Error;
332 fn try_from(field: &protobuf::Field) -> Result<Self, Self::Error> {
333 let datatype = field.arrow_type.as_deref().required("arrow_type")?;
334 let field = Self::new(field.name.as_str(), datatype, field.nullable)
335 .with_metadata(field.metadata.clone());
336 Ok(field)
337 }
338}
339
340impl TryFrom<&protobuf::Schema> for Schema {
341 type Error = Error;
342
343 fn try_from(
344 schema: &protobuf::Schema,
345 ) -> datafusion_common::Result<Self, Self::Error> {
346 let fields = schema
347 .columns
348 .iter()
349 .map(Field::try_from)
350 .collect::<datafusion_common::Result<Vec<_>, _>>()?;
351 Ok(Self::new_with_metadata(fields, schema.metadata.clone()))
352 }
353}
354
355impl TryFrom<&protobuf::ScalarValue> for ScalarValue {
356 type Error = Error;
357
358 fn try_from(
359 scalar: &protobuf::ScalarValue,
360 ) -> datafusion_common::Result<Self, Self::Error> {
361 use protobuf::scalar_value::Value;
362
363 let value = scalar
364 .value
365 .as_ref()
366 .ok_or_else(|| Error::required("value"))?;
367
368 Ok(match value {
369 Value::BoolValue(v) => Self::Boolean(Some(*v)),
370 Value::Utf8Value(v) => Self::Utf8(Some(v.to_owned())),
371 Value::Utf8ViewValue(v) => Self::Utf8View(Some(v.to_owned())),
372 Value::LargeUtf8Value(v) => Self::LargeUtf8(Some(v.to_owned())),
373 Value::Int8Value(v) => Self::Int8(Some(*v as i8)),
374 Value::Int16Value(v) => Self::Int16(Some(*v as i16)),
375 Value::Int32Value(v) => Self::Int32(Some(*v)),
376 Value::Int64Value(v) => Self::Int64(Some(*v)),
377 Value::Uint8Value(v) => Self::UInt8(Some(*v as u8)),
378 Value::Uint16Value(v) => Self::UInt16(Some(*v as u16)),
379 Value::Uint32Value(v) => Self::UInt32(Some(*v)),
380 Value::Uint64Value(v) => Self::UInt64(Some(*v)),
381 Value::Float32Value(v) => Self::Float32(Some(*v)),
382 Value::Float64Value(v) => Self::Float64(Some(*v)),
383 Value::Date32Value(v) => Self::Date32(Some(*v)),
384 Value::ListValue(v)
386 | Value::FixedSizeListValue(v)
387 | Value::LargeListValue(v)
388 | Value::StructValue(v)
389 | Value::MapValue(v) => {
390 let protobuf::ScalarNestedValue {
391 ipc_message,
392 arrow_data,
393 dictionaries,
394 schema,
395 } = &v;
396
397 let schema: Schema = if let Some(schema_ref) = schema {
398 schema_ref.try_into()?
399 } else {
400 return Err(Error::General(
401 "Invalid schema while deserializing ScalarValue::List"
402 .to_string(),
403 ));
404 };
405
406 let message = root_as_message(ipc_message.as_slice()).map_err(|e| {
407 Error::General(format!(
408 "Error IPC message while deserializing ScalarValue::List: {e}"
409 ))
410 })?;
411 let buffer = Buffer::from(arrow_data.as_slice());
412
413 let ipc_batch = message.header_as_record_batch().ok_or_else(|| {
414 Error::General(
415 "Unexpected message type deserializing ScalarValue::List"
416 .to_string(),
417 )
418 })?;
419
420 let dict_by_id: HashMap<i64,ArrayRef> = dictionaries.iter().map(|protobuf::scalar_nested_value::Dictionary { ipc_message, arrow_data }| {
421 let message = root_as_message(ipc_message.as_slice()).map_err(|e| {
422 Error::General(format!(
423 "Error IPC message while deserializing ScalarValue::List dictionary message: {e}"
424 ))
425 })?;
426 let buffer = Buffer::from(arrow_data.as_slice());
427
428 let dict_batch = message.header_as_dictionary_batch().ok_or_else(|| {
429 Error::General(
430 "Unexpected message type deserializing ScalarValue::List dictionary message"
431 .to_string(),
432 )
433 })?;
434
435 let id = dict_batch.id();
436
437 let record_batch = read_record_batch(
438 &buffer,
439 dict_batch.data().unwrap(),
440 Arc::new(schema.clone()),
441 &Default::default(),
442 None,
443 &message.version(),
444 )?;
445
446 let values: ArrayRef = Arc::clone(record_batch.column(0));
447
448 Ok((id, values))
449 }).collect::<datafusion_common::Result<HashMap<_, _>>>()?;
450
451 let record_batch = read_record_batch(
452 &buffer,
453 ipc_batch,
454 Arc::new(schema),
455 &dict_by_id,
456 None,
457 &message.version(),
458 )
459 .map_err(|e| arrow_datafusion_err!(e))
460 .map_err(|e| e.context("Decoding ScalarValue::List Value"))?;
461 let arr = record_batch.column(0);
462 match value {
463 Value::ListValue(_) => {
464 Self::List(arr.as_list::<i32>().to_owned().into())
465 }
466 Value::LargeListValue(_) => {
467 Self::LargeList(arr.as_list::<i64>().to_owned().into())
468 }
469 Value::FixedSizeListValue(_) => {
470 Self::FixedSizeList(arr.as_fixed_size_list().to_owned().into())
471 }
472 Value::StructValue(_) => {
473 Self::Struct(arr.as_struct().to_owned().into())
474 }
475 Value::MapValue(_) => Self::Map(arr.as_map().to_owned().into()),
476 _ => unreachable!(),
477 }
478 }
479 Value::NullValue(v) => {
480 let null_type: DataType = v.try_into()?;
481 null_type.try_into().map_err(Error::DataFusionError)?
482 }
483 Value::Decimal32Value(val) => {
484 let array = vec_to_array(val.value.clone());
485 Self::Decimal32(Some(i32::from_be_bytes(array)), val.p as u8, val.s as i8)
486 }
487 Value::Decimal64Value(val) => {
488 let array = vec_to_array(val.value.clone());
489 Self::Decimal64(Some(i64::from_be_bytes(array)), val.p as u8, val.s as i8)
490 }
491 Value::Decimal128Value(val) => {
492 let array = vec_to_array(val.value.clone());
493 Self::Decimal128(
494 Some(i128::from_be_bytes(array)),
495 val.p as u8,
496 val.s as i8,
497 )
498 }
499 Value::Decimal256Value(val) => {
500 let array = vec_to_array(val.value.clone());
501 Self::Decimal256(
502 Some(i256::from_be_bytes(array)),
503 val.p as u8,
504 val.s as i8,
505 )
506 }
507 Value::Date64Value(v) => Self::Date64(Some(*v)),
508 Value::Time32Value(v) => {
509 let time_value =
510 v.value.as_ref().ok_or_else(|| Error::required("value"))?;
511 match time_value {
512 protobuf::scalar_time32_value::Value::Time32SecondValue(t) => {
513 Self::Time32Second(Some(*t))
514 }
515 protobuf::scalar_time32_value::Value::Time32MillisecondValue(t) => {
516 Self::Time32Millisecond(Some(*t))
517 }
518 }
519 }
520 Value::Time64Value(v) => {
521 let time_value =
522 v.value.as_ref().ok_or_else(|| Error::required("value"))?;
523 match time_value {
524 protobuf::scalar_time64_value::Value::Time64MicrosecondValue(t) => {
525 Self::Time64Microsecond(Some(*t))
526 }
527 protobuf::scalar_time64_value::Value::Time64NanosecondValue(t) => {
528 Self::Time64Nanosecond(Some(*t))
529 }
530 }
531 }
532 Value::IntervalYearmonthValue(v) => Self::IntervalYearMonth(Some(*v)),
533 Value::DurationSecondValue(v) => Self::DurationSecond(Some(*v)),
534 Value::DurationMillisecondValue(v) => Self::DurationMillisecond(Some(*v)),
535 Value::DurationMicrosecondValue(v) => Self::DurationMicrosecond(Some(*v)),
536 Value::DurationNanosecondValue(v) => Self::DurationNanosecond(Some(*v)),
537 Value::TimestampValue(v) => {
538 let timezone = if v.timezone.is_empty() {
539 None
540 } else {
541 Some(v.timezone.as_str().into())
542 };
543
544 let ts_value =
545 v.value.as_ref().ok_or_else(|| Error::required("value"))?;
546
547 match ts_value {
548 protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(t) => {
549 Self::TimestampMicrosecond(Some(*t), timezone)
550 }
551 protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(t) => {
552 Self::TimestampNanosecond(Some(*t), timezone)
553 }
554 protobuf::scalar_timestamp_value::Value::TimeSecondValue(t) => {
555 Self::TimestampSecond(Some(*t), timezone)
556 }
557 protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(t) => {
558 Self::TimestampMillisecond(Some(*t), timezone)
559 }
560 }
561 }
562 Value::DictionaryValue(v) => {
563 let index_type: DataType = v
564 .index_type
565 .as_ref()
566 .ok_or_else(|| Error::required("index_type"))?
567 .try_into()?;
568
569 let value: Self = v
570 .value
571 .as_ref()
572 .ok_or_else(|| Error::required("value"))?
573 .as_ref()
574 .try_into()?;
575
576 Self::Dictionary(Box::new(index_type), Box::new(value))
577 }
578 Value::BinaryValue(v) => Self::Binary(Some(v.clone())),
579 Value::BinaryViewValue(v) => Self::BinaryView(Some(v.clone())),
580 Value::LargeBinaryValue(v) => Self::LargeBinary(Some(v.clone())),
581 Value::IntervalDaytimeValue(v) => Self::IntervalDayTime(Some(
582 IntervalDayTimeType::make_value(v.days, v.milliseconds),
583 )),
584 Value::IntervalMonthDayNano(v) => Self::IntervalMonthDayNano(Some(
585 IntervalMonthDayNanoType::make_value(v.months, v.days, v.nanos),
586 )),
587 Value::UnionValue(val) => {
588 let mode = match val.mode {
589 0 => UnionMode::Sparse,
590 1 => UnionMode::Dense,
591 id => Err(Error::unknown("UnionMode", id))?,
592 };
593 let ids = val
594 .fields
595 .iter()
596 .map(|f| f.field_id as i8)
597 .collect::<Vec<_>>();
598 let fields = val
599 .fields
600 .iter()
601 .map(|f| f.field.clone())
602 .collect::<Option<Vec<_>>>();
603 let fields = fields.ok_or_else(|| Error::required("UnionField"))?;
604 let fields = parse_proto_fields_to_fields(&fields)?;
605 let fields = UnionFields::new(ids, fields);
606 let v_id = val.value_id as i8;
607 let val = match &val.value {
608 None => None,
609 Some(val) => {
610 let val: ScalarValue = val
611 .as_ref()
612 .try_into()
613 .map_err(|_| Error::General("Invalid Scalar".to_string()))?;
614 Some((v_id, Box::new(val)))
615 }
616 };
617 Self::Union(val, fields, mode)
618 }
619 Value::FixedSizeBinaryValue(v) => {
620 Self::FixedSizeBinary(v.length, Some(v.clone().values))
621 }
622 })
623 }
624}
625
626impl From<protobuf::TimeUnit> for TimeUnit {
627 fn from(time_unit: protobuf::TimeUnit) -> Self {
628 match time_unit {
629 protobuf::TimeUnit::Second => TimeUnit::Second,
630 protobuf::TimeUnit::Millisecond => TimeUnit::Millisecond,
631 protobuf::TimeUnit::Microsecond => TimeUnit::Microsecond,
632 protobuf::TimeUnit::Nanosecond => TimeUnit::Nanosecond,
633 }
634 }
635}
636
637impl From<protobuf::IntervalUnit> for IntervalUnit {
638 fn from(interval_unit: protobuf::IntervalUnit) -> Self {
639 match interval_unit {
640 protobuf::IntervalUnit::YearMonth => IntervalUnit::YearMonth,
641 protobuf::IntervalUnit::DayTime => IntervalUnit::DayTime,
642 protobuf::IntervalUnit::MonthDayNano => IntervalUnit::MonthDayNano,
643 }
644 }
645}
646
647impl From<protobuf::Constraints> for Constraints {
648 fn from(constraints: protobuf::Constraints) -> Self {
649 Constraints::new_unverified(
650 constraints
651 .constraints
652 .into_iter()
653 .map(|item| item.into())
654 .collect(),
655 )
656 }
657}
658
659impl From<protobuf::Constraint> for Constraint {
660 fn from(value: protobuf::Constraint) -> Self {
661 match value.constraint_mode.unwrap() {
662 protobuf::constraint::ConstraintMode::PrimaryKey(elem) => {
663 Constraint::PrimaryKey(
664 elem.indices.into_iter().map(|item| item as usize).collect(),
665 )
666 }
667 protobuf::constraint::ConstraintMode::Unique(elem) => Constraint::Unique(
668 elem.indices.into_iter().map(|item| item as usize).collect(),
669 ),
670 }
671 }
672}
673
674impl From<&protobuf::ColumnStats> for ColumnStatistics {
675 fn from(cs: &protobuf::ColumnStats) -> ColumnStatistics {
676 ColumnStatistics {
677 null_count: if let Some(nc) = &cs.null_count {
678 nc.clone().into()
679 } else {
680 Precision::Absent
681 },
682 max_value: if let Some(max) = &cs.max_value {
683 max.clone().into()
684 } else {
685 Precision::Absent
686 },
687 min_value: if let Some(min) = &cs.min_value {
688 min.clone().into()
689 } else {
690 Precision::Absent
691 },
692 sum_value: if let Some(sum) = &cs.sum_value {
693 sum.clone().into()
694 } else {
695 Precision::Absent
696 },
697 distinct_count: if let Some(dc) = &cs.distinct_count {
698 dc.clone().into()
699 } else {
700 Precision::Absent
701 },
702 }
703 }
704}
705
706impl From<protobuf::Precision> for Precision<usize> {
707 fn from(s: protobuf::Precision) -> Self {
708 let Ok(precision_type) = s.precision_info.try_into() else {
709 return Precision::Absent;
710 };
711 match precision_type {
712 protobuf::PrecisionInfo::Exact => {
713 if let Some(val) = s.val {
714 if let Ok(ScalarValue::UInt64(Some(val))) =
715 ScalarValue::try_from(&val)
716 {
717 Precision::Exact(val as usize)
718 } else {
719 Precision::Absent
720 }
721 } else {
722 Precision::Absent
723 }
724 }
725 protobuf::PrecisionInfo::Inexact => {
726 if let Some(val) = s.val {
727 if let Ok(ScalarValue::UInt64(Some(val))) =
728 ScalarValue::try_from(&val)
729 {
730 Precision::Inexact(val as usize)
731 } else {
732 Precision::Absent
733 }
734 } else {
735 Precision::Absent
736 }
737 }
738 protobuf::PrecisionInfo::Absent => Precision::Absent,
739 }
740 }
741}
742
743impl From<protobuf::Precision> for Precision<ScalarValue> {
744 fn from(s: protobuf::Precision) -> Self {
745 let Ok(precision_type) = s.precision_info.try_into() else {
746 return Precision::Absent;
747 };
748 match precision_type {
749 protobuf::PrecisionInfo::Exact => {
750 if let Some(val) = s.val {
751 if let Ok(val) = ScalarValue::try_from(&val) {
752 Precision::Exact(val)
753 } else {
754 Precision::Absent
755 }
756 } else {
757 Precision::Absent
758 }
759 }
760 protobuf::PrecisionInfo::Inexact => {
761 if let Some(val) = s.val {
762 if let Ok(val) = ScalarValue::try_from(&val) {
763 Precision::Inexact(val)
764 } else {
765 Precision::Absent
766 }
767 } else {
768 Precision::Absent
769 }
770 }
771 protobuf::PrecisionInfo::Absent => Precision::Absent,
772 }
773 }
774}
775
776impl From<protobuf::JoinSide> for JoinSide {
777 fn from(t: protobuf::JoinSide) -> Self {
778 match t {
779 protobuf::JoinSide::LeftSide => JoinSide::Left,
780 protobuf::JoinSide::RightSide => JoinSide::Right,
781 protobuf::JoinSide::None => JoinSide::None,
782 }
783 }
784}
785
786impl From<&protobuf::Constraint> for Constraint {
787 fn from(value: &protobuf::Constraint) -> Self {
788 match &value.constraint_mode {
789 Some(protobuf::constraint::ConstraintMode::PrimaryKey(elem)) => {
790 Constraint::PrimaryKey(
791 elem.indices.iter().map(|&item| item as usize).collect(),
792 )
793 }
794 Some(protobuf::constraint::ConstraintMode::Unique(elem)) => {
795 Constraint::Unique(
796 elem.indices.iter().map(|&item| item as usize).collect(),
797 )
798 }
799 None => panic!("constraint_mode not set"),
800 }
801 }
802}
803
804impl TryFrom<&protobuf::Constraints> for Constraints {
805 type Error = DataFusionError;
806
807 fn try_from(
808 constraints: &protobuf::Constraints,
809 ) -> datafusion_common::Result<Self, Self::Error> {
810 Ok(Constraints::new_unverified(
811 constraints
812 .constraints
813 .iter()
814 .map(|item| item.into())
815 .collect(),
816 ))
817 }
818}
819
820impl TryFrom<&protobuf::Statistics> for Statistics {
821 type Error = DataFusionError;
822
823 fn try_from(
824 s: &protobuf::Statistics,
825 ) -> datafusion_common::Result<Self, Self::Error> {
826 Ok(Statistics {
828 num_rows: if let Some(nr) = &s.num_rows {
829 nr.clone().into()
830 } else {
831 Precision::Absent
832 },
833 total_byte_size: if let Some(tbs) = &s.total_byte_size {
834 tbs.clone().into()
835 } else {
836 Precision::Absent
837 },
838 column_statistics: s.column_stats.iter().map(|s| s.into()).collect(),
840 })
841 }
842}
843
844impl From<protobuf::CompressionTypeVariant> for CompressionTypeVariant {
845 fn from(value: protobuf::CompressionTypeVariant) -> Self {
846 match value {
847 protobuf::CompressionTypeVariant::Gzip => Self::GZIP,
848 protobuf::CompressionTypeVariant::Bzip2 => Self::BZIP2,
849 protobuf::CompressionTypeVariant::Xz => Self::XZ,
850 protobuf::CompressionTypeVariant::Zstd => Self::ZSTD,
851 protobuf::CompressionTypeVariant::Uncompressed => Self::UNCOMPRESSED,
852 }
853 }
854}
855
856impl From<CompressionTypeVariant> for protobuf::CompressionTypeVariant {
857 fn from(value: CompressionTypeVariant) -> Self {
858 match value {
859 CompressionTypeVariant::GZIP => Self::Gzip,
860 CompressionTypeVariant::BZIP2 => Self::Bzip2,
861 CompressionTypeVariant::XZ => Self::Xz,
862 CompressionTypeVariant::ZSTD => Self::Zstd,
863 CompressionTypeVariant::UNCOMPRESSED => Self::Uncompressed,
864 }
865 }
866}
867
868impl TryFrom<&protobuf::CsvWriterOptions> for CsvWriterOptions {
869 type Error = DataFusionError;
870
871 fn try_from(
872 opts: &protobuf::CsvWriterOptions,
873 ) -> datafusion_common::Result<Self, Self::Error> {
874 let write_options = csv_writer_options_from_proto(opts)?;
875 let compression: CompressionTypeVariant = opts.compression().into();
876 Ok(CsvWriterOptions::new(write_options, compression))
877 }
878}
879
880impl TryFrom<&protobuf::JsonWriterOptions> for JsonWriterOptions {
881 type Error = DataFusionError;
882
883 fn try_from(
884 opts: &protobuf::JsonWriterOptions,
885 ) -> datafusion_common::Result<Self, Self::Error> {
886 let compression: CompressionTypeVariant = opts.compression().into();
887 Ok(JsonWriterOptions::new(compression))
888 }
889}
890
891impl TryFrom<&protobuf::CsvOptions> for CsvOptions {
892 type Error = DataFusionError;
893
894 fn try_from(
895 proto_opts: &protobuf::CsvOptions,
896 ) -> datafusion_common::Result<Self, Self::Error> {
897 Ok(CsvOptions {
898 has_header: proto_opts.has_header.first().map(|h| *h != 0),
899 delimiter: proto_opts.delimiter[0],
900 quote: proto_opts.quote[0],
901 terminator: proto_opts.terminator.first().copied(),
902 escape: proto_opts.escape.first().copied(),
903 double_quote: proto_opts.has_header.first().map(|h| *h != 0),
904 newlines_in_values: proto_opts.newlines_in_values.first().map(|h| *h != 0),
905 compression: proto_opts.compression().into(),
906 schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize),
907 date_format: (!proto_opts.date_format.is_empty())
908 .then(|| proto_opts.date_format.clone()),
909 datetime_format: (!proto_opts.datetime_format.is_empty())
910 .then(|| proto_opts.datetime_format.clone()),
911 timestamp_format: (!proto_opts.timestamp_format.is_empty())
912 .then(|| proto_opts.timestamp_format.clone()),
913 timestamp_tz_format: (!proto_opts.timestamp_tz_format.is_empty())
914 .then(|| proto_opts.timestamp_tz_format.clone()),
915 time_format: (!proto_opts.time_format.is_empty())
916 .then(|| proto_opts.time_format.clone()),
917 null_value: (!proto_opts.null_value.is_empty())
918 .then(|| proto_opts.null_value.clone()),
919 null_regex: (!proto_opts.null_regex.is_empty())
920 .then(|| proto_opts.null_regex.clone()),
921 comment: proto_opts.comment.first().copied(),
922 truncated_rows: proto_opts.truncated_rows.first().map(|h| *h != 0),
923 })
924 }
925}
926
927impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
928 type Error = DataFusionError;
929
930 fn try_from(
931 value: &protobuf::ParquetOptions,
932 ) -> datafusion_common::Result<Self, Self::Error> {
933 #[allow(deprecated)] Ok(ParquetOptions {
935 enable_page_index: value.enable_page_index,
936 pruning: value.pruning,
937 skip_metadata: value.skip_metadata,
938 metadata_size_hint: value
939 .metadata_size_hint_opt
940 .map(|opt| match opt {
941 protobuf::parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v) => Some(v as usize),
942 })
943 .unwrap_or(None),
944 pushdown_filters: value.pushdown_filters,
945 reorder_filters: value.reorder_filters,
946 data_pagesize_limit: value.data_pagesize_limit as usize,
947 write_batch_size: value.write_batch_size as usize,
948 writer_version: value.writer_version.clone(),
949 compression: value.compression_opt.clone().map(|opt| match opt {
950 protobuf::parquet_options::CompressionOpt::Compression(v) => Some(v),
951 }).unwrap_or(None),
952 dictionary_enabled: value.dictionary_enabled_opt.as_ref().map(|protobuf::parquet_options::DictionaryEnabledOpt::DictionaryEnabled(v)| *v),
953 dictionary_page_size_limit: value.dictionary_page_size_limit as usize,
955 statistics_enabled: value
956 .statistics_enabled_opt.clone()
957 .map(|opt| match opt {
958 protobuf::parquet_options::StatisticsEnabledOpt::StatisticsEnabled(v) => Some(v),
959 })
960 .unwrap_or(None),
961 max_row_group_size: value.max_row_group_size as usize,
962 created_by: value.created_by.clone(),
963 column_index_truncate_length: value
964 .column_index_truncate_length_opt.as_ref()
965 .map(|opt| match opt {
966 protobuf::parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v) => Some(*v as usize),
967 })
968 .unwrap_or(None),
969 statistics_truncate_length: value
970 .statistics_truncate_length_opt.as_ref()
971 .map(|opt| match opt {
972 protobuf::parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v) => Some(*v as usize),
973 })
974 .unwrap_or(None),
975 data_page_row_count_limit: value.data_page_row_count_limit as usize,
976 encoding: value
977 .encoding_opt.clone()
978 .map(|opt| match opt {
979 protobuf::parquet_options::EncodingOpt::Encoding(v) => Some(v),
980 })
981 .unwrap_or(None),
982 bloom_filter_on_read: value.bloom_filter_on_read,
983 bloom_filter_on_write: value.bloom_filter_on_write,
984 bloom_filter_fpp: value.clone()
985 .bloom_filter_fpp_opt
986 .map(|opt| match opt {
987 protobuf::parquet_options::BloomFilterFppOpt::BloomFilterFpp(v) => Some(v),
988 })
989 .unwrap_or(None),
990 bloom_filter_ndv: value.clone()
991 .bloom_filter_ndv_opt
992 .map(|opt| match opt {
993 protobuf::parquet_options::BloomFilterNdvOpt::BloomFilterNdv(v) => Some(v),
994 })
995 .unwrap_or(None),
996 allow_single_file_parallelism: value.allow_single_file_parallelism,
997 maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as usize,
998 maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as usize,
999 schema_force_view_types: value.schema_force_view_types,
1000 binary_as_string: value.binary_as_string,
1001 coerce_int96: value.coerce_int96_opt.clone().map(|opt| match opt {
1002 protobuf::parquet_options::CoerceInt96Opt::CoerceInt96(v) => Some(v),
1003 }).unwrap_or(None),
1004 skip_arrow_metadata: value.skip_arrow_metadata,
1005 max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt {
1006 protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize),
1007 }).unwrap_or(None),
1008 })
1009 }
1010}
1011
1012impl TryFrom<&protobuf::ParquetColumnOptions> for ParquetColumnOptions {
1013 type Error = DataFusionError;
1014 fn try_from(
1015 value: &protobuf::ParquetColumnOptions,
1016 ) -> datafusion_common::Result<Self, Self::Error> {
1017 #[allow(deprecated)] Ok(ParquetColumnOptions {
1019 compression: value.compression_opt.clone().map(|opt| match opt {
1020 protobuf::parquet_column_options::CompressionOpt::Compression(v) => Some(v),
1021 }).unwrap_or(None),
1022 dictionary_enabled: value.dictionary_enabled_opt.as_ref().map(|protobuf::parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(v)| *v),
1023 statistics_enabled: value
1024 .statistics_enabled_opt.clone()
1025 .map(|opt| match opt {
1026 protobuf::parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(v) => Some(v),
1027 })
1028 .unwrap_or(None),
1029 encoding: value
1030 .encoding_opt.clone()
1031 .map(|opt| match opt {
1032 protobuf::parquet_column_options::EncodingOpt::Encoding(v) => Some(v),
1033 })
1034 .unwrap_or(None),
1035 bloom_filter_enabled: value.bloom_filter_enabled_opt.map(|opt| match opt {
1036 protobuf::parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v) => Some(v),
1037 })
1038 .unwrap_or(None),
1039 bloom_filter_fpp: value
1040 .bloom_filter_fpp_opt
1041 .map(|opt| match opt {
1042 protobuf::parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v) => Some(v),
1043 })
1044 .unwrap_or(None),
1045 bloom_filter_ndv: value
1046 .bloom_filter_ndv_opt
1047 .map(|opt| match opt {
1048 protobuf::parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v) => Some(v),
1049 })
1050 .unwrap_or(None),
1051 })
1052 }
1053}
1054
1055impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions {
1056 type Error = DataFusionError;
1057 fn try_from(
1058 value: &protobuf::TableParquetOptions,
1059 ) -> datafusion_common::Result<Self, Self::Error> {
1060 let mut column_specific_options: HashMap<String, ParquetColumnOptions> =
1061 HashMap::new();
1062 for protobuf::ParquetColumnSpecificOptions {
1063 column_name,
1064 options: maybe_options,
1065 } in &value.column_specific_options
1066 {
1067 if let Some(options) = maybe_options {
1068 column_specific_options.insert(column_name.clone(), options.try_into()?);
1069 }
1070 }
1071 Ok(TableParquetOptions {
1072 global: value
1073 .global
1074 .as_ref()
1075 .map(|v| v.try_into())
1076 .unwrap()
1077 .unwrap(),
1078 column_specific_options,
1079 key_value_metadata: Default::default(),
1080 crypto: Default::default(),
1081 })
1082 }
1083}
1084
1085impl TryFrom<&protobuf::JsonOptions> for JsonOptions {
1086 type Error = DataFusionError;
1087
1088 fn try_from(
1089 proto_opts: &protobuf::JsonOptions,
1090 ) -> datafusion_common::Result<Self, Self::Error> {
1091 let compression: protobuf::CompressionTypeVariant = proto_opts.compression();
1092 Ok(JsonOptions {
1093 compression: compression.into(),
1094 schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize),
1095 })
1096 }
1097}
1098
1099pub fn parse_i32_to_time_unit(value: &i32) -> datafusion_common::Result<TimeUnit, Error> {
1100 protobuf::TimeUnit::try_from(*value)
1101 .map(|t| t.into())
1102 .map_err(|_| Error::unknown("TimeUnit", *value))
1103}
1104
1105pub fn parse_i32_to_interval_unit(
1106 value: &i32,
1107) -> datafusion_common::Result<IntervalUnit, Error> {
1108 protobuf::IntervalUnit::try_from(*value)
1109 .map(|t| t.into())
1110 .map_err(|_| Error::unknown("IntervalUnit", *value))
1111}
1112
1113fn vec_to_array<T, const N: usize>(v: Vec<T>) -> [T; N] {
1115 v.try_into().unwrap_or_else(|v: Vec<T>| {
1116 panic!("Expected a Vec of length {} but it was {}", N, v.len())
1117 })
1118}
1119
1120pub fn parse_proto_fields_to_fields<'a, I>(
1122 fields: I,
1123) -> std::result::Result<Vec<Field>, Error>
1124where
1125 I: IntoIterator<Item = &'a protobuf::Field>,
1126{
1127 fields
1128 .into_iter()
1129 .map(Field::try_from)
1130 .collect::<datafusion_common::Result<_, _>>()
1131}
1132
1133pub(crate) fn csv_writer_options_from_proto(
1134 writer_options: &protobuf::CsvWriterOptions,
1135) -> datafusion_common::Result<WriterBuilder> {
1136 let mut builder = WriterBuilder::new();
1137 if !writer_options.delimiter.is_empty() {
1138 if let Some(delimiter) = writer_options.delimiter.chars().next() {
1139 if delimiter.is_ascii() {
1140 builder = builder.with_delimiter(delimiter as u8);
1141 } else {
1142 return Err(proto_error("CSV Delimiter is not ASCII"));
1143 }
1144 } else {
1145 return Err(proto_error("Error parsing CSV Delimiter"));
1146 }
1147 }
1148 if !writer_options.quote.is_empty() {
1149 if let Some(quote) = writer_options.quote.chars().next() {
1150 if quote.is_ascii() {
1151 builder = builder.with_quote(quote as u8);
1152 } else {
1153 return Err(proto_error("CSV Quote is not ASCII"));
1154 }
1155 } else {
1156 return Err(proto_error("Error parsing CSV Quote"));
1157 }
1158 }
1159 if !writer_options.escape.is_empty() {
1160 if let Some(escape) = writer_options.escape.chars().next() {
1161 if escape.is_ascii() {
1162 builder = builder.with_escape(escape as u8);
1163 } else {
1164 return Err(proto_error("CSV Escape is not ASCII"));
1165 }
1166 } else {
1167 return Err(proto_error("Error parsing CSV Escape"));
1168 }
1169 }
1170 Ok(builder
1171 .with_header(writer_options.has_header)
1172 .with_date_format(writer_options.date_format.clone())
1173 .with_datetime_format(writer_options.datetime_format.clone())
1174 .with_timestamp_format(writer_options.timestamp_format.clone())
1175 .with_time_format(writer_options.time_format.clone())
1176 .with_null(writer_options.null_value.clone())
1177 .with_double_quote(writer_options.double_quote))
1178}