1use std::collections::HashMap;
19use std::convert::{TryFrom, TryInto};
20use std::sync::Arc;
21
22use crate::common::proto_error;
23use crate::protobuf_common as protobuf;
24use arrow::array::{ArrayRef, AsArray};
25use arrow::buffer::Buffer;
26use arrow::csv::WriterBuilder;
27use arrow::datatypes::{
28 DataType, Field, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, Schema,
29 TimeUnit, UnionFields, UnionMode, i256,
30};
31use arrow::ipc::{reader::read_record_batch, root_as_message};
32
33use datafusion_common::{
34 Column, ColumnStatistics, Constraint, Constraints, DFSchema, DFSchemaRef,
35 DataFusionError, JoinSide, ScalarValue, Statistics, TableReference,
36 arrow_datafusion_err,
37 config::{
38 CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
39 TableParquetOptions,
40 },
41 file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
42 parsers::CompressionTypeVariant,
43 plan_datafusion_err,
44 stats::Precision,
45};
46
47#[derive(Debug)]
48pub enum Error {
49 General(String),
50
51 DataFusionError(DataFusionError),
52
53 MissingRequiredField(String),
54
55 AtLeastOneValue(String),
56
57 UnknownEnumVariant { name: String, value: i32 },
58}
59
60impl std::fmt::Display for Error {
61 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
62 match self {
63 Self::General(desc) => write!(f, "General error: {desc}"),
64
65 Self::DataFusionError(desc) => {
66 write!(f, "DataFusion error: {desc:?}")
67 }
68
69 Self::MissingRequiredField(name) => {
70 write!(f, "Missing required field {name}")
71 }
72 Self::AtLeastOneValue(name) => {
73 write!(f, "Must have at least one {name}, found 0")
74 }
75 Self::UnknownEnumVariant { name, value } => {
76 write!(f, "Unknown i32 value for {name} enum: {value}")
77 }
78 }
79 }
80}
81
82impl std::error::Error for Error {}
83
84impl From<DataFusionError> for Error {
85 fn from(e: DataFusionError) -> Self {
86 Error::DataFusionError(e)
87 }
88}
89
90impl Error {
91 pub fn required(field: impl Into<String>) -> Error {
92 Error::MissingRequiredField(field.into())
93 }
94
95 pub fn unknown(name: impl Into<String>, value: i32) -> Error {
96 Error::UnknownEnumVariant {
97 name: name.into(),
98 value,
99 }
100 }
101}
102
103impl From<Error> for DataFusionError {
104 fn from(e: Error) -> Self {
105 plan_datafusion_err!("{}", e)
106 }
107}
108
109pub trait FromOptionalField<T> {
112 fn optional(self) -> datafusion_common::Result<Option<T>, Error>;
117
118 fn required(self, field: impl Into<String>) -> datafusion_common::Result<T, Error>;
123}
124
125impl<T, U> FromOptionalField<U> for Option<T>
126where
127 T: TryInto<U, Error = Error>,
128{
129 fn optional(self) -> datafusion_common::Result<Option<U>, Error> {
130 self.map(|t| t.try_into()).transpose()
131 }
132
133 fn required(self, field: impl Into<String>) -> datafusion_common::Result<U, Error> {
134 match self {
135 None => Err(Error::required(field)),
136 Some(t) => t.try_into(),
137 }
138 }
139}
140
141impl From<protobuf::ColumnRelation> for TableReference {
142 fn from(rel: protobuf::ColumnRelation) -> Self {
143 Self::parse_str_normalized(rel.relation.as_str(), true)
144 }
145}
146
147impl From<protobuf::Column> for Column {
148 fn from(c: protobuf::Column) -> Self {
149 let protobuf::Column { relation, name } = c;
150
151 Self::new(relation, name)
152 }
153}
154
155impl From<&protobuf::Column> for Column {
156 fn from(c: &protobuf::Column) -> Self {
157 c.clone().into()
158 }
159}
160
161impl TryFrom<&protobuf::DfSchema> for DFSchema {
162 type Error = Error;
163
164 fn try_from(
165 df_schema: &protobuf::DfSchema,
166 ) -> datafusion_common::Result<Self, Self::Error> {
167 let df_fields = df_schema.columns.clone();
168 let qualifiers_and_fields: Vec<(Option<TableReference>, Arc<Field>)> = df_fields
169 .iter()
170 .map(|df_field| {
171 let field: Field = df_field.field.as_ref().required("field")?;
172 Ok((
173 df_field.qualifier.as_ref().map(|q| q.clone().into()),
174 Arc::new(field),
175 ))
176 })
177 .collect::<datafusion_common::Result<Vec<_>, Error>>()?;
178
179 Ok(DFSchema::new_with_metadata(
180 qualifiers_and_fields,
181 df_schema.metadata.clone(),
182 )?)
183 }
184}
185
186impl TryFrom<protobuf::DfSchema> for DFSchemaRef {
187 type Error = Error;
188
189 fn try_from(
190 df_schema: protobuf::DfSchema,
191 ) -> datafusion_common::Result<Self, Self::Error> {
192 let dfschema: DFSchema = (&df_schema).try_into()?;
193 Ok(Arc::new(dfschema))
194 }
195}
196
197impl TryFrom<&protobuf::ArrowType> for DataType {
198 type Error = Error;
199
200 fn try_from(
201 arrow_type: &protobuf::ArrowType,
202 ) -> datafusion_common::Result<Self, Self::Error> {
203 arrow_type
204 .arrow_type_enum
205 .as_ref()
206 .required("arrow_type_enum")
207 }
208}
209
210impl TryFrom<&protobuf::arrow_type::ArrowTypeEnum> for DataType {
211 type Error = Error;
212 fn try_from(
213 arrow_type_enum: &protobuf::arrow_type::ArrowTypeEnum,
214 ) -> datafusion_common::Result<Self, Self::Error> {
215 use protobuf::arrow_type;
216 Ok(match arrow_type_enum {
217 arrow_type::ArrowTypeEnum::None(_) => DataType::Null,
218 arrow_type::ArrowTypeEnum::Bool(_) => DataType::Boolean,
219 arrow_type::ArrowTypeEnum::Uint8(_) => DataType::UInt8,
220 arrow_type::ArrowTypeEnum::Int8(_) => DataType::Int8,
221 arrow_type::ArrowTypeEnum::Uint16(_) => DataType::UInt16,
222 arrow_type::ArrowTypeEnum::Int16(_) => DataType::Int16,
223 arrow_type::ArrowTypeEnum::Uint32(_) => DataType::UInt32,
224 arrow_type::ArrowTypeEnum::Int32(_) => DataType::Int32,
225 arrow_type::ArrowTypeEnum::Uint64(_) => DataType::UInt64,
226 arrow_type::ArrowTypeEnum::Int64(_) => DataType::Int64,
227 arrow_type::ArrowTypeEnum::Float16(_) => DataType::Float16,
228 arrow_type::ArrowTypeEnum::Float32(_) => DataType::Float32,
229 arrow_type::ArrowTypeEnum::Float64(_) => DataType::Float64,
230 arrow_type::ArrowTypeEnum::Utf8(_) => DataType::Utf8,
231 arrow_type::ArrowTypeEnum::Utf8View(_) => DataType::Utf8View,
232 arrow_type::ArrowTypeEnum::LargeUtf8(_) => DataType::LargeUtf8,
233 arrow_type::ArrowTypeEnum::Binary(_) => DataType::Binary,
234 arrow_type::ArrowTypeEnum::BinaryView(_) => DataType::BinaryView,
235 arrow_type::ArrowTypeEnum::FixedSizeBinary(size) => {
236 DataType::FixedSizeBinary(*size)
237 }
238 arrow_type::ArrowTypeEnum::LargeBinary(_) => DataType::LargeBinary,
239 arrow_type::ArrowTypeEnum::Date32(_) => DataType::Date32,
240 arrow_type::ArrowTypeEnum::Date64(_) => DataType::Date64,
241 arrow_type::ArrowTypeEnum::Duration(time_unit) => {
242 DataType::Duration(parse_i32_to_time_unit(time_unit)?)
243 }
244 arrow_type::ArrowTypeEnum::Timestamp(protobuf::Timestamp {
245 time_unit,
246 timezone,
247 }) => DataType::Timestamp(
248 parse_i32_to_time_unit(time_unit)?,
249 match timezone.len() {
250 0 => None,
251 _ => Some(timezone.as_str().into()),
252 },
253 ),
254 arrow_type::ArrowTypeEnum::Time32(time_unit) => {
255 DataType::Time32(parse_i32_to_time_unit(time_unit)?)
256 }
257 arrow_type::ArrowTypeEnum::Time64(time_unit) => {
258 DataType::Time64(parse_i32_to_time_unit(time_unit)?)
259 }
260 arrow_type::ArrowTypeEnum::Interval(interval_unit) => {
261 DataType::Interval(parse_i32_to_interval_unit(interval_unit)?)
262 }
263 arrow_type::ArrowTypeEnum::Decimal32(protobuf::Decimal32Type {
264 precision,
265 scale,
266 }) => DataType::Decimal32(*precision as u8, *scale as i8),
267 arrow_type::ArrowTypeEnum::Decimal64(protobuf::Decimal64Type {
268 precision,
269 scale,
270 }) => DataType::Decimal64(*precision as u8, *scale as i8),
271 arrow_type::ArrowTypeEnum::Decimal128(protobuf::Decimal128Type {
272 precision,
273 scale,
274 }) => DataType::Decimal128(*precision as u8, *scale as i8),
275 arrow_type::ArrowTypeEnum::Decimal256(protobuf::Decimal256Type {
276 precision,
277 scale,
278 }) => DataType::Decimal256(*precision as u8, *scale as i8),
279 arrow_type::ArrowTypeEnum::List(list) => {
280 let list_type =
281 list.as_ref().field_type.as_deref().required("field_type")?;
282 DataType::List(Arc::new(list_type))
283 }
284 arrow_type::ArrowTypeEnum::LargeList(list) => {
285 let list_type =
286 list.as_ref().field_type.as_deref().required("field_type")?;
287 DataType::LargeList(Arc::new(list_type))
288 }
289 arrow_type::ArrowTypeEnum::FixedSizeList(list) => {
290 let list_type =
291 list.as_ref().field_type.as_deref().required("field_type")?;
292 let list_size = list.list_size;
293 DataType::FixedSizeList(Arc::new(list_type), list_size)
294 }
295 arrow_type::ArrowTypeEnum::Struct(strct) => DataType::Struct(
296 parse_proto_fields_to_fields(&strct.sub_field_types)?.into(),
297 ),
298 arrow_type::ArrowTypeEnum::Union(union) => {
299 let union_mode = protobuf::UnionMode::try_from(union.union_mode)
300 .map_err(|_| Error::unknown("UnionMode", union.union_mode))?;
301 let union_mode = match union_mode {
302 protobuf::UnionMode::Dense => UnionMode::Dense,
303 protobuf::UnionMode::Sparse => UnionMode::Sparse,
304 };
305 let union_fields = parse_proto_fields_to_fields(&union.union_types)?;
306
307 let type_ids: Vec<_> = match union.type_ids.is_empty() {
309 true => (0..union_fields.len() as i8).collect(),
310 false => union.type_ids.iter().map(|i| *i as i8).collect(),
311 };
312
313 DataType::Union(UnionFields::new(type_ids, union_fields), union_mode)
314 }
315 arrow_type::ArrowTypeEnum::Dictionary(dict) => {
316 let key_datatype = dict.as_ref().key.as_deref().required("key")?;
317 let value_datatype = dict.as_ref().value.as_deref().required("value")?;
318 DataType::Dictionary(Box::new(key_datatype), Box::new(value_datatype))
319 }
320 arrow_type::ArrowTypeEnum::Map(map) => {
321 let field: Field =
322 map.as_ref().field_type.as_deref().required("field_type")?;
323 let keys_sorted = map.keys_sorted;
324 DataType::Map(Arc::new(field), keys_sorted)
325 }
326 })
327 }
328}
329
330impl TryFrom<&protobuf::Field> for Field {
331 type Error = Error;
332 fn try_from(field: &protobuf::Field) -> Result<Self, Self::Error> {
333 let datatype = field.arrow_type.as_deref().required("arrow_type")?;
334 let field = Self::new(field.name.as_str(), datatype, field.nullable)
335 .with_metadata(field.metadata.clone());
336 Ok(field)
337 }
338}
339
340impl TryFrom<&protobuf::Schema> for Schema {
341 type Error = Error;
342
343 fn try_from(
344 schema: &protobuf::Schema,
345 ) -> datafusion_common::Result<Self, Self::Error> {
346 let fields = schema
347 .columns
348 .iter()
349 .map(Field::try_from)
350 .collect::<datafusion_common::Result<Vec<_>, _>>()?;
351 Ok(Self::new_with_metadata(fields, schema.metadata.clone()))
352 }
353}
354
355impl TryFrom<&protobuf::ScalarValue> for ScalarValue {
356 type Error = Error;
357
358 fn try_from(
359 scalar: &protobuf::ScalarValue,
360 ) -> datafusion_common::Result<Self, Self::Error> {
361 use protobuf::scalar_value::Value;
362
363 let value = scalar
364 .value
365 .as_ref()
366 .ok_or_else(|| Error::required("value"))?;
367
368 Ok(match value {
369 Value::BoolValue(v) => Self::Boolean(Some(*v)),
370 Value::Utf8Value(v) => Self::Utf8(Some(v.to_owned())),
371 Value::Utf8ViewValue(v) => Self::Utf8View(Some(v.to_owned())),
372 Value::LargeUtf8Value(v) => Self::LargeUtf8(Some(v.to_owned())),
373 Value::Int8Value(v) => Self::Int8(Some(*v as i8)),
374 Value::Int16Value(v) => Self::Int16(Some(*v as i16)),
375 Value::Int32Value(v) => Self::Int32(Some(*v)),
376 Value::Int64Value(v) => Self::Int64(Some(*v)),
377 Value::Uint8Value(v) => Self::UInt8(Some(*v as u8)),
378 Value::Uint16Value(v) => Self::UInt16(Some(*v as u16)),
379 Value::Uint32Value(v) => Self::UInt32(Some(*v)),
380 Value::Uint64Value(v) => Self::UInt64(Some(*v)),
381 Value::Float32Value(v) => Self::Float32(Some(*v)),
382 Value::Float64Value(v) => Self::Float64(Some(*v)),
383 Value::Date32Value(v) => Self::Date32(Some(*v)),
384 Value::ListValue(v)
386 | Value::FixedSizeListValue(v)
387 | Value::LargeListValue(v)
388 | Value::StructValue(v)
389 | Value::MapValue(v) => {
390 let protobuf::ScalarNestedValue {
391 ipc_message,
392 arrow_data,
393 dictionaries,
394 schema,
395 } = &v;
396
397 let schema: Schema = if let Some(schema_ref) = schema {
398 schema_ref.try_into()?
399 } else {
400 return Err(Error::General(
401 "Invalid schema while deserializing ScalarValue::List"
402 .to_string(),
403 ));
404 };
405
406 let message = root_as_message(ipc_message.as_slice()).map_err(|e| {
407 Error::General(format!(
408 "Error IPC message while deserializing ScalarValue::List: {e}"
409 ))
410 })?;
411 let buffer = Buffer::from(arrow_data.as_slice());
412
413 let ipc_batch = message.header_as_record_batch().ok_or_else(|| {
414 Error::General(
415 "Unexpected message type deserializing ScalarValue::List"
416 .to_string(),
417 )
418 })?;
419
420 let dict_by_id: HashMap<i64,ArrayRef> = dictionaries.iter().map(|protobuf::scalar_nested_value::Dictionary { ipc_message, arrow_data }| {
421 let message = root_as_message(ipc_message.as_slice()).map_err(|e| {
422 Error::General(format!(
423 "Error IPC message while deserializing ScalarValue::List dictionary message: {e}"
424 ))
425 })?;
426 let buffer = Buffer::from(arrow_data.as_slice());
427
428 let dict_batch = message.header_as_dictionary_batch().ok_or_else(|| {
429 Error::General(
430 "Unexpected message type deserializing ScalarValue::List dictionary message"
431 .to_string(),
432 )
433 })?;
434
435 let id = dict_batch.id();
436
437 let record_batch = read_record_batch(
438 &buffer,
439 dict_batch.data().unwrap(),
440 Arc::new(schema.clone()),
441 &Default::default(),
442 None,
443 &message.version(),
444 )?;
445
446 let values: ArrayRef = Arc::clone(record_batch.column(0));
447
448 Ok((id, values))
449 }).collect::<datafusion_common::Result<HashMap<_, _>>>()?;
450
451 let record_batch = read_record_batch(
452 &buffer,
453 ipc_batch,
454 Arc::new(schema),
455 &dict_by_id,
456 None,
457 &message.version(),
458 )
459 .map_err(|e| arrow_datafusion_err!(e))
460 .map_err(|e| e.context("Decoding ScalarValue::List Value"))?;
461 let arr = record_batch.column(0);
462 match value {
463 Value::ListValue(_) => {
464 Self::List(arr.as_list::<i32>().to_owned().into())
465 }
466 Value::LargeListValue(_) => {
467 Self::LargeList(arr.as_list::<i64>().to_owned().into())
468 }
469 Value::FixedSizeListValue(_) => {
470 Self::FixedSizeList(arr.as_fixed_size_list().to_owned().into())
471 }
472 Value::StructValue(_) => {
473 Self::Struct(arr.as_struct().to_owned().into())
474 }
475 Value::MapValue(_) => Self::Map(arr.as_map().to_owned().into()),
476 _ => unreachable!(),
477 }
478 }
479 Value::NullValue(v) => {
480 let null_type: DataType = v.try_into()?;
481 null_type.try_into().map_err(Error::DataFusionError)?
482 }
483 Value::Decimal32Value(val) => {
484 let array = vec_to_array(val.value.clone());
485 Self::Decimal32(Some(i32::from_be_bytes(array)), val.p as u8, val.s as i8)
486 }
487 Value::Decimal64Value(val) => {
488 let array = vec_to_array(val.value.clone());
489 Self::Decimal64(Some(i64::from_be_bytes(array)), val.p as u8, val.s as i8)
490 }
491 Value::Decimal128Value(val) => {
492 let array = vec_to_array(val.value.clone());
493 Self::Decimal128(
494 Some(i128::from_be_bytes(array)),
495 val.p as u8,
496 val.s as i8,
497 )
498 }
499 Value::Decimal256Value(val) => {
500 let array = vec_to_array(val.value.clone());
501 Self::Decimal256(
502 Some(i256::from_be_bytes(array)),
503 val.p as u8,
504 val.s as i8,
505 )
506 }
507 Value::Date64Value(v) => Self::Date64(Some(*v)),
508 Value::Time32Value(v) => {
509 let time_value =
510 v.value.as_ref().ok_or_else(|| Error::required("value"))?;
511 match time_value {
512 protobuf::scalar_time32_value::Value::Time32SecondValue(t) => {
513 Self::Time32Second(Some(*t))
514 }
515 protobuf::scalar_time32_value::Value::Time32MillisecondValue(t) => {
516 Self::Time32Millisecond(Some(*t))
517 }
518 }
519 }
520 Value::Time64Value(v) => {
521 let time_value =
522 v.value.as_ref().ok_or_else(|| Error::required("value"))?;
523 match time_value {
524 protobuf::scalar_time64_value::Value::Time64MicrosecondValue(t) => {
525 Self::Time64Microsecond(Some(*t))
526 }
527 protobuf::scalar_time64_value::Value::Time64NanosecondValue(t) => {
528 Self::Time64Nanosecond(Some(*t))
529 }
530 }
531 }
532 Value::IntervalYearmonthValue(v) => Self::IntervalYearMonth(Some(*v)),
533 Value::DurationSecondValue(v) => Self::DurationSecond(Some(*v)),
534 Value::DurationMillisecondValue(v) => Self::DurationMillisecond(Some(*v)),
535 Value::DurationMicrosecondValue(v) => Self::DurationMicrosecond(Some(*v)),
536 Value::DurationNanosecondValue(v) => Self::DurationNanosecond(Some(*v)),
537 Value::TimestampValue(v) => {
538 let timezone = if v.timezone.is_empty() {
539 None
540 } else {
541 Some(v.timezone.as_str().into())
542 };
543
544 let ts_value =
545 v.value.as_ref().ok_or_else(|| Error::required("value"))?;
546
547 match ts_value {
548 protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(t) => {
549 Self::TimestampMicrosecond(Some(*t), timezone)
550 }
551 protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(t) => {
552 Self::TimestampNanosecond(Some(*t), timezone)
553 }
554 protobuf::scalar_timestamp_value::Value::TimeSecondValue(t) => {
555 Self::TimestampSecond(Some(*t), timezone)
556 }
557 protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(t) => {
558 Self::TimestampMillisecond(Some(*t), timezone)
559 }
560 }
561 }
562 Value::DictionaryValue(v) => {
563 let index_type: DataType = v
564 .index_type
565 .as_ref()
566 .ok_or_else(|| Error::required("index_type"))?
567 .try_into()?;
568
569 let value: Self = v
570 .value
571 .as_ref()
572 .ok_or_else(|| Error::required("value"))?
573 .as_ref()
574 .try_into()?;
575
576 Self::Dictionary(Box::new(index_type), Box::new(value))
577 }
578 Value::BinaryValue(v) => Self::Binary(Some(v.clone())),
579 Value::BinaryViewValue(v) => Self::BinaryView(Some(v.clone())),
580 Value::LargeBinaryValue(v) => Self::LargeBinary(Some(v.clone())),
581 Value::IntervalDaytimeValue(v) => Self::IntervalDayTime(Some(
582 IntervalDayTimeType::make_value(v.days, v.milliseconds),
583 )),
584 Value::IntervalMonthDayNano(v) => Self::IntervalMonthDayNano(Some(
585 IntervalMonthDayNanoType::make_value(v.months, v.days, v.nanos),
586 )),
587 Value::UnionValue(val) => {
588 let mode = match val.mode {
589 0 => UnionMode::Sparse,
590 1 => UnionMode::Dense,
591 id => Err(Error::unknown("UnionMode", id))?,
592 };
593 let ids = val
594 .fields
595 .iter()
596 .map(|f| f.field_id as i8)
597 .collect::<Vec<_>>();
598 let fields = val
599 .fields
600 .iter()
601 .map(|f| f.field.clone())
602 .collect::<Option<Vec<_>>>();
603 let fields = fields.ok_or_else(|| Error::required("UnionField"))?;
604 let fields = parse_proto_fields_to_fields(&fields)?;
605 let fields = UnionFields::new(ids, fields);
606 let v_id = val.value_id as i8;
607 let val = match &val.value {
608 None => None,
609 Some(val) => {
610 let val: ScalarValue = val
611 .as_ref()
612 .try_into()
613 .map_err(|_| Error::General("Invalid Scalar".to_string()))?;
614 Some((v_id, Box::new(val)))
615 }
616 };
617 Self::Union(val, fields, mode)
618 }
619 Value::FixedSizeBinaryValue(v) => {
620 Self::FixedSizeBinary(v.length, Some(v.clone().values))
621 }
622 })
623 }
624}
625
626impl From<protobuf::TimeUnit> for TimeUnit {
627 fn from(time_unit: protobuf::TimeUnit) -> Self {
628 match time_unit {
629 protobuf::TimeUnit::Second => TimeUnit::Second,
630 protobuf::TimeUnit::Millisecond => TimeUnit::Millisecond,
631 protobuf::TimeUnit::Microsecond => TimeUnit::Microsecond,
632 protobuf::TimeUnit::Nanosecond => TimeUnit::Nanosecond,
633 }
634 }
635}
636
637impl From<protobuf::IntervalUnit> for IntervalUnit {
638 fn from(interval_unit: protobuf::IntervalUnit) -> Self {
639 match interval_unit {
640 protobuf::IntervalUnit::YearMonth => IntervalUnit::YearMonth,
641 protobuf::IntervalUnit::DayTime => IntervalUnit::DayTime,
642 protobuf::IntervalUnit::MonthDayNano => IntervalUnit::MonthDayNano,
643 }
644 }
645}
646
647impl From<protobuf::Constraints> for Constraints {
648 fn from(constraints: protobuf::Constraints) -> Self {
649 Constraints::new_unverified(
650 constraints
651 .constraints
652 .into_iter()
653 .map(|item| item.into())
654 .collect(),
655 )
656 }
657}
658
659impl From<protobuf::Constraint> for Constraint {
660 fn from(value: protobuf::Constraint) -> Self {
661 match value.constraint_mode.unwrap() {
662 protobuf::constraint::ConstraintMode::PrimaryKey(elem) => {
663 Constraint::PrimaryKey(
664 elem.indices.into_iter().map(|item| item as usize).collect(),
665 )
666 }
667 protobuf::constraint::ConstraintMode::Unique(elem) => Constraint::Unique(
668 elem.indices.into_iter().map(|item| item as usize).collect(),
669 ),
670 }
671 }
672}
673
674impl From<&protobuf::ColumnStats> for ColumnStatistics {
675 fn from(cs: &protobuf::ColumnStats) -> ColumnStatistics {
676 ColumnStatistics {
677 null_count: if let Some(nc) = &cs.null_count {
678 nc.clone().into()
679 } else {
680 Precision::Absent
681 },
682 max_value: if let Some(max) = &cs.max_value {
683 max.clone().into()
684 } else {
685 Precision::Absent
686 },
687 min_value: if let Some(min) = &cs.min_value {
688 min.clone().into()
689 } else {
690 Precision::Absent
691 },
692 sum_value: if let Some(sum) = &cs.sum_value {
693 sum.clone().into()
694 } else {
695 Precision::Absent
696 },
697 distinct_count: if let Some(dc) = &cs.distinct_count {
698 dc.clone().into()
699 } else {
700 Precision::Absent
701 },
702 byte_size: if let Some(sbs) = &cs.byte_size {
703 sbs.clone().into()
704 } else {
705 Precision::Absent
706 },
707 }
708 }
709}
710
711impl From<protobuf::Precision> for Precision<usize> {
712 fn from(s: protobuf::Precision) -> Self {
713 let Ok(precision_type) = s.precision_info.try_into() else {
714 return Precision::Absent;
715 };
716 match precision_type {
717 protobuf::PrecisionInfo::Exact => {
718 if let Some(val) = s.val {
719 if let Ok(ScalarValue::UInt64(Some(val))) =
720 ScalarValue::try_from(&val)
721 {
722 Precision::Exact(val as usize)
723 } else {
724 Precision::Absent
725 }
726 } else {
727 Precision::Absent
728 }
729 }
730 protobuf::PrecisionInfo::Inexact => {
731 if let Some(val) = s.val {
732 if let Ok(ScalarValue::UInt64(Some(val))) =
733 ScalarValue::try_from(&val)
734 {
735 Precision::Inexact(val as usize)
736 } else {
737 Precision::Absent
738 }
739 } else {
740 Precision::Absent
741 }
742 }
743 protobuf::PrecisionInfo::Absent => Precision::Absent,
744 }
745 }
746}
747
748impl From<protobuf::Precision> for Precision<ScalarValue> {
749 fn from(s: protobuf::Precision) -> Self {
750 let Ok(precision_type) = s.precision_info.try_into() else {
751 return Precision::Absent;
752 };
753 match precision_type {
754 protobuf::PrecisionInfo::Exact => {
755 if let Some(val) = s.val {
756 if let Ok(val) = ScalarValue::try_from(&val) {
757 Precision::Exact(val)
758 } else {
759 Precision::Absent
760 }
761 } else {
762 Precision::Absent
763 }
764 }
765 protobuf::PrecisionInfo::Inexact => {
766 if let Some(val) = s.val {
767 if let Ok(val) = ScalarValue::try_from(&val) {
768 Precision::Inexact(val)
769 } else {
770 Precision::Absent
771 }
772 } else {
773 Precision::Absent
774 }
775 }
776 protobuf::PrecisionInfo::Absent => Precision::Absent,
777 }
778 }
779}
780
781impl From<protobuf::JoinSide> for JoinSide {
782 fn from(t: protobuf::JoinSide) -> Self {
783 match t {
784 protobuf::JoinSide::LeftSide => JoinSide::Left,
785 protobuf::JoinSide::RightSide => JoinSide::Right,
786 protobuf::JoinSide::None => JoinSide::None,
787 }
788 }
789}
790
791impl From<&protobuf::Constraint> for Constraint {
792 fn from(value: &protobuf::Constraint) -> Self {
793 match &value.constraint_mode {
794 Some(protobuf::constraint::ConstraintMode::PrimaryKey(elem)) => {
795 Constraint::PrimaryKey(
796 elem.indices.iter().map(|&item| item as usize).collect(),
797 )
798 }
799 Some(protobuf::constraint::ConstraintMode::Unique(elem)) => {
800 Constraint::Unique(
801 elem.indices.iter().map(|&item| item as usize).collect(),
802 )
803 }
804 None => panic!("constraint_mode not set"),
805 }
806 }
807}
808
809impl TryFrom<&protobuf::Constraints> for Constraints {
810 type Error = DataFusionError;
811
812 fn try_from(
813 constraints: &protobuf::Constraints,
814 ) -> datafusion_common::Result<Self, Self::Error> {
815 Ok(Constraints::new_unverified(
816 constraints
817 .constraints
818 .iter()
819 .map(|item| item.into())
820 .collect(),
821 ))
822 }
823}
824
825impl TryFrom<&protobuf::Statistics> for Statistics {
826 type Error = DataFusionError;
827
828 fn try_from(
829 s: &protobuf::Statistics,
830 ) -> datafusion_common::Result<Self, Self::Error> {
831 Ok(Statistics {
833 num_rows: if let Some(nr) = &s.num_rows {
834 nr.clone().into()
835 } else {
836 Precision::Absent
837 },
838 total_byte_size: if let Some(tbs) = &s.total_byte_size {
839 tbs.clone().into()
840 } else {
841 Precision::Absent
842 },
843 column_statistics: s.column_stats.iter().map(|s| s.into()).collect(),
845 })
846 }
847}
848
849impl From<protobuf::CompressionTypeVariant> for CompressionTypeVariant {
850 fn from(value: protobuf::CompressionTypeVariant) -> Self {
851 match value {
852 protobuf::CompressionTypeVariant::Gzip => Self::GZIP,
853 protobuf::CompressionTypeVariant::Bzip2 => Self::BZIP2,
854 protobuf::CompressionTypeVariant::Xz => Self::XZ,
855 protobuf::CompressionTypeVariant::Zstd => Self::ZSTD,
856 protobuf::CompressionTypeVariant::Uncompressed => Self::UNCOMPRESSED,
857 }
858 }
859}
860
861impl From<CompressionTypeVariant> for protobuf::CompressionTypeVariant {
862 fn from(value: CompressionTypeVariant) -> Self {
863 match value {
864 CompressionTypeVariant::GZIP => Self::Gzip,
865 CompressionTypeVariant::BZIP2 => Self::Bzip2,
866 CompressionTypeVariant::XZ => Self::Xz,
867 CompressionTypeVariant::ZSTD => Self::Zstd,
868 CompressionTypeVariant::UNCOMPRESSED => Self::Uncompressed,
869 }
870 }
871}
872
873impl TryFrom<&protobuf::CsvWriterOptions> for CsvWriterOptions {
874 type Error = DataFusionError;
875
876 fn try_from(
877 opts: &protobuf::CsvWriterOptions,
878 ) -> datafusion_common::Result<Self, Self::Error> {
879 let write_options = csv_writer_options_from_proto(opts)?;
880 let compression: CompressionTypeVariant = opts.compression().into();
881 Ok(CsvWriterOptions::new(write_options, compression))
882 }
883}
884
885impl TryFrom<&protobuf::JsonWriterOptions> for JsonWriterOptions {
886 type Error = DataFusionError;
887
888 fn try_from(
889 opts: &protobuf::JsonWriterOptions,
890 ) -> datafusion_common::Result<Self, Self::Error> {
891 let compression: CompressionTypeVariant = opts.compression().into();
892 Ok(JsonWriterOptions::new(compression))
893 }
894}
895
896impl TryFrom<&protobuf::CsvOptions> for CsvOptions {
897 type Error = DataFusionError;
898
899 fn try_from(
900 proto_opts: &protobuf::CsvOptions,
901 ) -> datafusion_common::Result<Self, Self::Error> {
902 Ok(CsvOptions {
903 has_header: proto_opts.has_header.first().map(|h| *h != 0),
904 delimiter: proto_opts.delimiter[0],
905 quote: proto_opts.quote[0],
906 terminator: proto_opts.terminator.first().copied(),
907 escape: proto_opts.escape.first().copied(),
908 double_quote: proto_opts.double_quote.first().map(|h| *h != 0),
909 newlines_in_values: proto_opts.newlines_in_values.first().map(|h| *h != 0),
910 compression: proto_opts.compression().into(),
911 compression_level: proto_opts.compression_level,
912 schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize),
913 date_format: (!proto_opts.date_format.is_empty())
914 .then(|| proto_opts.date_format.clone()),
915 datetime_format: (!proto_opts.datetime_format.is_empty())
916 .then(|| proto_opts.datetime_format.clone()),
917 timestamp_format: (!proto_opts.timestamp_format.is_empty())
918 .then(|| proto_opts.timestamp_format.clone()),
919 timestamp_tz_format: (!proto_opts.timestamp_tz_format.is_empty())
920 .then(|| proto_opts.timestamp_tz_format.clone()),
921 time_format: (!proto_opts.time_format.is_empty())
922 .then(|| proto_opts.time_format.clone()),
923 null_value: (!proto_opts.null_value.is_empty())
924 .then(|| proto_opts.null_value.clone()),
925 null_regex: (!proto_opts.null_regex.is_empty())
926 .then(|| proto_opts.null_regex.clone()),
927 comment: proto_opts.comment.first().copied(),
928 truncated_rows: proto_opts.truncated_rows.first().map(|h| *h != 0),
929 })
930 }
931}
932
933impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
934 type Error = DataFusionError;
935
936 fn try_from(
937 value: &protobuf::ParquetOptions,
938 ) -> datafusion_common::Result<Self, Self::Error> {
939 Ok(ParquetOptions {
940 enable_page_index: value.enable_page_index,
941 pruning: value.pruning,
942 skip_metadata: value.skip_metadata,
943 metadata_size_hint: value
944 .metadata_size_hint_opt
945 .map(|opt| match opt {
946 protobuf::parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v) => Some(v as usize),
947 })
948 .unwrap_or(None),
949 pushdown_filters: value.pushdown_filters,
950 reorder_filters: value.reorder_filters,
951 force_filter_selections: value.force_filter_selections,
952 data_pagesize_limit: value.data_pagesize_limit as usize,
953 write_batch_size: value.write_batch_size as usize,
954 writer_version: value.writer_version.parse().map_err(|e| {
955 DataFusionError::Internal(format!("Failed to parse writer_version: {e}"))
956 })?,
957 compression: value.compression_opt.clone().map(|opt| match opt {
958 protobuf::parquet_options::CompressionOpt::Compression(v) => Some(v),
959 }).unwrap_or(None),
960 dictionary_enabled: value.dictionary_enabled_opt.as_ref().map(|protobuf::parquet_options::DictionaryEnabledOpt::DictionaryEnabled(v)| *v),
961 dictionary_page_size_limit: value.dictionary_page_size_limit as usize,
963 statistics_enabled: value
964 .statistics_enabled_opt.clone()
965 .map(|opt| match opt {
966 protobuf::parquet_options::StatisticsEnabledOpt::StatisticsEnabled(v) => Some(v),
967 })
968 .unwrap_or(None),
969 max_row_group_size: value.max_row_group_size as usize,
970 created_by: value.created_by.clone(),
971 column_index_truncate_length: value
972 .column_index_truncate_length_opt.as_ref()
973 .map(|opt| match opt {
974 protobuf::parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v) => Some(*v as usize),
975 })
976 .unwrap_or(None),
977 statistics_truncate_length: value
978 .statistics_truncate_length_opt.as_ref()
979 .map(|opt| match opt {
980 protobuf::parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v) => Some(*v as usize),
981 })
982 .unwrap_or(None),
983 data_page_row_count_limit: value.data_page_row_count_limit as usize,
984 encoding: value
985 .encoding_opt.clone()
986 .map(|opt| match opt {
987 protobuf::parquet_options::EncodingOpt::Encoding(v) => Some(v),
988 })
989 .unwrap_or(None),
990 bloom_filter_on_read: value.bloom_filter_on_read,
991 bloom_filter_on_write: value.bloom_filter_on_write,
992 bloom_filter_fpp: value.clone()
993 .bloom_filter_fpp_opt
994 .map(|opt| match opt {
995 protobuf::parquet_options::BloomFilterFppOpt::BloomFilterFpp(v) => Some(v),
996 })
997 .unwrap_or(None),
998 bloom_filter_ndv: value.clone()
999 .bloom_filter_ndv_opt
1000 .map(|opt| match opt {
1001 protobuf::parquet_options::BloomFilterNdvOpt::BloomFilterNdv(v) => Some(v),
1002 })
1003 .unwrap_or(None),
1004 allow_single_file_parallelism: value.allow_single_file_parallelism,
1005 maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as usize,
1006 maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as usize,
1007 schema_force_view_types: value.schema_force_view_types,
1008 binary_as_string: value.binary_as_string,
1009 coerce_int96: value.coerce_int96_opt.clone().map(|opt| match opt {
1010 protobuf::parquet_options::CoerceInt96Opt::CoerceInt96(v) => Some(v),
1011 }).unwrap_or(None),
1012 skip_arrow_metadata: value.skip_arrow_metadata,
1013 max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt {
1014 protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize),
1015 }).unwrap_or(None),
1016 })
1017 }
1018}
1019
1020impl TryFrom<&protobuf::ParquetColumnOptions> for ParquetColumnOptions {
1021 type Error = DataFusionError;
1022 fn try_from(
1023 value: &protobuf::ParquetColumnOptions,
1024 ) -> datafusion_common::Result<Self, Self::Error> {
1025 Ok(ParquetColumnOptions {
1026 compression: value.compression_opt.clone().map(|opt| match opt {
1027 protobuf::parquet_column_options::CompressionOpt::Compression(v) => Some(v),
1028 }).unwrap_or(None),
1029 dictionary_enabled: value.dictionary_enabled_opt.as_ref().map(|protobuf::parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(v)| *v),
1030 statistics_enabled: value
1031 .statistics_enabled_opt.clone()
1032 .map(|opt| match opt {
1033 protobuf::parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(v) => Some(v),
1034 })
1035 .unwrap_or(None),
1036 encoding: value
1037 .encoding_opt.clone()
1038 .map(|opt| match opt {
1039 protobuf::parquet_column_options::EncodingOpt::Encoding(v) => Some(v),
1040 })
1041 .unwrap_or(None),
1042 bloom_filter_enabled: value.bloom_filter_enabled_opt.map(|opt| match opt {
1043 protobuf::parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v) => Some(v),
1044 })
1045 .unwrap_or(None),
1046 bloom_filter_fpp: value
1047 .bloom_filter_fpp_opt
1048 .map(|opt| match opt {
1049 protobuf::parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v) => Some(v),
1050 })
1051 .unwrap_or(None),
1052 bloom_filter_ndv: value
1053 .bloom_filter_ndv_opt
1054 .map(|opt| match opt {
1055 protobuf::parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v) => Some(v),
1056 })
1057 .unwrap_or(None),
1058 })
1059 }
1060}
1061
1062impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions {
1063 type Error = DataFusionError;
1064 fn try_from(
1065 value: &protobuf::TableParquetOptions,
1066 ) -> datafusion_common::Result<Self, Self::Error> {
1067 let mut column_specific_options: HashMap<String, ParquetColumnOptions> =
1068 HashMap::new();
1069 for protobuf::ParquetColumnSpecificOptions {
1070 column_name,
1071 options: maybe_options,
1072 } in &value.column_specific_options
1073 {
1074 if let Some(options) = maybe_options {
1075 column_specific_options.insert(column_name.clone(), options.try_into()?);
1076 }
1077 }
1078 Ok(TableParquetOptions {
1079 global: value
1080 .global
1081 .as_ref()
1082 .map(|v| v.try_into())
1083 .unwrap()
1084 .unwrap(),
1085 column_specific_options,
1086 key_value_metadata: Default::default(),
1087 crypto: Default::default(),
1088 })
1089 }
1090}
1091
1092impl TryFrom<&protobuf::JsonOptions> for JsonOptions {
1093 type Error = DataFusionError;
1094
1095 fn try_from(
1096 proto_opts: &protobuf::JsonOptions,
1097 ) -> datafusion_common::Result<Self, Self::Error> {
1098 let compression: protobuf::CompressionTypeVariant = proto_opts.compression();
1099 Ok(JsonOptions {
1100 compression: compression.into(),
1101 compression_level: proto_opts.compression_level,
1102 schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize),
1103 })
1104 }
1105}
1106
1107pub fn parse_i32_to_time_unit(value: &i32) -> datafusion_common::Result<TimeUnit, Error> {
1108 protobuf::TimeUnit::try_from(*value)
1109 .map(|t| t.into())
1110 .map_err(|_| Error::unknown("TimeUnit", *value))
1111}
1112
1113pub fn parse_i32_to_interval_unit(
1114 value: &i32,
1115) -> datafusion_common::Result<IntervalUnit, Error> {
1116 protobuf::IntervalUnit::try_from(*value)
1117 .map(|t| t.into())
1118 .map_err(|_| Error::unknown("IntervalUnit", *value))
1119}
1120
1121fn vec_to_array<T, const N: usize>(v: Vec<T>) -> [T; N] {
1123 v.try_into().unwrap_or_else(|v: Vec<T>| {
1124 panic!("Expected a Vec of length {} but it was {}", N, v.len())
1125 })
1126}
1127
1128pub fn parse_proto_fields_to_fields<'a, I>(
1130 fields: I,
1131) -> std::result::Result<Vec<Field>, Error>
1132where
1133 I: IntoIterator<Item = &'a protobuf::Field>,
1134{
1135 fields
1136 .into_iter()
1137 .map(Field::try_from)
1138 .collect::<datafusion_common::Result<_, _>>()
1139}
1140
1141pub(crate) fn csv_writer_options_from_proto(
1142 writer_options: &protobuf::CsvWriterOptions,
1143) -> datafusion_common::Result<WriterBuilder> {
1144 let mut builder = WriterBuilder::new();
1145 if !writer_options.delimiter.is_empty() {
1146 if let Some(delimiter) = writer_options.delimiter.chars().next() {
1147 if delimiter.is_ascii() {
1148 builder = builder.with_delimiter(delimiter as u8);
1149 } else {
1150 return Err(proto_error("CSV Delimiter is not ASCII"));
1151 }
1152 } else {
1153 return Err(proto_error("Error parsing CSV Delimiter"));
1154 }
1155 }
1156 if !writer_options.quote.is_empty() {
1157 if let Some(quote) = writer_options.quote.chars().next() {
1158 if quote.is_ascii() {
1159 builder = builder.with_quote(quote as u8);
1160 } else {
1161 return Err(proto_error("CSV Quote is not ASCII"));
1162 }
1163 } else {
1164 return Err(proto_error("Error parsing CSV Quote"));
1165 }
1166 }
1167 if !writer_options.escape.is_empty() {
1168 if let Some(escape) = writer_options.escape.chars().next() {
1169 if escape.is_ascii() {
1170 builder = builder.with_escape(escape as u8);
1171 } else {
1172 return Err(proto_error("CSV Escape is not ASCII"));
1173 }
1174 } else {
1175 return Err(proto_error("Error parsing CSV Escape"));
1176 }
1177 }
1178 Ok(builder
1179 .with_header(writer_options.has_header)
1180 .with_date_format(writer_options.date_format.clone())
1181 .with_datetime_format(writer_options.datetime_format.clone())
1182 .with_timestamp_format(writer_options.timestamp_format.clone())
1183 .with_time_format(writer_options.time_format.clone())
1184 .with_null(writer_options.null_value.clone())
1185 .with_double_quote(writer_options.double_quote))
1186}