1use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::common::proto_error;
22use crate::protobuf_common as protobuf;
23use arrow::array::{ArrayRef, AsArray};
24use arrow::buffer::Buffer;
25use arrow::csv::{QuoteStyle, WriterBuilder};
26use arrow::datatypes::{
27 DataType, Field, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, Schema,
28 TimeUnit, UnionFields, UnionMode, i256,
29};
30use arrow::ipc::{
31 convert::fb_to_schema,
32 reader::{read_dictionary, read_record_batch},
33 root_as_message,
34 writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions},
35};
36
37use datafusion_common::{
38 Column, ColumnStatistics, Constraint, Constraints, DFSchema, DFSchemaRef,
39 DataFusionError, JoinSide, ScalarValue, Statistics, TableReference,
40 arrow_datafusion_err,
41 config::{
42 CsvOptions, JsonOptions, ParquetCdcOptions, ParquetColumnOptions, ParquetOptions,
43 TableParquetOptions,
44 },
45 file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
46 parsers::CompressionTypeVariant,
47 plan_datafusion_err,
48 stats::Precision,
49};
50
51#[derive(Debug)]
52pub enum Error {
53 General(String),
54
55 DataFusionError(DataFusionError),
56
57 MissingRequiredField(String),
58
59 AtLeastOneValue(String),
60
61 UnknownEnumVariant { name: String, value: i32 },
62}
63
64impl std::fmt::Display for Error {
65 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
66 match self {
67 Self::General(desc) => write!(f, "General error: {desc}"),
68
69 Self::DataFusionError(desc) => {
70 write!(f, "DataFusion error: {desc:?}")
71 }
72
73 Self::MissingRequiredField(name) => {
74 write!(f, "Missing required field {name}")
75 }
76 Self::AtLeastOneValue(name) => {
77 write!(f, "Must have at least one {name}, found 0")
78 }
79 Self::UnknownEnumVariant { name, value } => {
80 write!(f, "Unknown i32 value for {name} enum: {value}")
81 }
82 }
83 }
84}
85
86impl std::error::Error for Error {}
87
88impl From<DataFusionError> for Error {
89 fn from(e: DataFusionError) -> Self {
90 Error::DataFusionError(e)
91 }
92}
93
94impl Error {
95 pub fn required(field: impl Into<String>) -> Error {
96 Error::MissingRequiredField(field.into())
97 }
98
99 pub fn unknown(name: impl Into<String>, value: i32) -> Error {
100 Error::UnknownEnumVariant {
101 name: name.into(),
102 value,
103 }
104 }
105}
106
107impl From<Error> for DataFusionError {
108 fn from(e: Error) -> Self {
109 plan_datafusion_err!("{}", e)
110 }
111}
112
113pub trait FromOptionalField<T> {
116 fn optional(self) -> datafusion_common::Result<Option<T>, Error>;
121
122 fn required(self, field: impl Into<String>) -> datafusion_common::Result<T, Error>;
127}
128
129impl<T, U> FromOptionalField<U> for Option<T>
130where
131 T: TryInto<U, Error = Error>,
132{
133 fn optional(self) -> datafusion_common::Result<Option<U>, Error> {
134 self.map(|t| t.try_into()).transpose()
135 }
136
137 fn required(self, field: impl Into<String>) -> datafusion_common::Result<U, Error> {
138 match self {
139 None => Err(Error::required(field)),
140 Some(t) => t.try_into(),
141 }
142 }
143}
144
145impl From<protobuf::ColumnRelation> for TableReference {
146 fn from(rel: protobuf::ColumnRelation) -> Self {
147 Self::parse_str_normalized(rel.relation.as_str(), true)
148 }
149}
150
151impl From<protobuf::Column> for Column {
152 fn from(c: protobuf::Column) -> Self {
153 let protobuf::Column { relation, name } = c;
154
155 Self::new(relation, name)
156 }
157}
158
159impl From<&protobuf::Column> for Column {
160 fn from(c: &protobuf::Column) -> Self {
161 c.clone().into()
162 }
163}
164
165impl TryFrom<&protobuf::DfSchema> for DFSchema {
166 type Error = Error;
167
168 fn try_from(
169 df_schema: &protobuf::DfSchema,
170 ) -> datafusion_common::Result<Self, Self::Error> {
171 let df_fields = df_schema.columns.clone();
172 let qualifiers_and_fields: Vec<(Option<TableReference>, Arc<Field>)> = df_fields
173 .iter()
174 .map(|df_field| {
175 let field: Field = df_field.field.as_ref().required("field")?;
176 Ok((
177 df_field.qualifier.as_ref().map(|q| q.clone().into()),
178 Arc::new(field),
179 ))
180 })
181 .collect::<datafusion_common::Result<Vec<_>, Error>>()?;
182
183 Ok(DFSchema::new_with_metadata(
184 qualifiers_and_fields,
185 df_schema.metadata.clone(),
186 )?)
187 }
188}
189
190impl TryFrom<protobuf::DfSchema> for DFSchemaRef {
191 type Error = Error;
192
193 fn try_from(
194 df_schema: protobuf::DfSchema,
195 ) -> datafusion_common::Result<Self, Self::Error> {
196 let dfschema: DFSchema = (&df_schema).try_into()?;
197 Ok(Arc::new(dfschema))
198 }
199}
200
201impl TryFrom<&protobuf::ArrowType> for DataType {
202 type Error = Error;
203
204 fn try_from(
205 arrow_type: &protobuf::ArrowType,
206 ) -> datafusion_common::Result<Self, Self::Error> {
207 arrow_type
208 .arrow_type_enum
209 .as_ref()
210 .required("arrow_type_enum")
211 }
212}
213
214impl TryFrom<&protobuf::arrow_type::ArrowTypeEnum> for DataType {
215 type Error = Error;
216 fn try_from(
217 arrow_type_enum: &protobuf::arrow_type::ArrowTypeEnum,
218 ) -> datafusion_common::Result<Self, Self::Error> {
219 use protobuf::arrow_type;
220 Ok(match arrow_type_enum {
221 arrow_type::ArrowTypeEnum::None(_) => DataType::Null,
222 arrow_type::ArrowTypeEnum::Bool(_) => DataType::Boolean,
223 arrow_type::ArrowTypeEnum::Uint8(_) => DataType::UInt8,
224 arrow_type::ArrowTypeEnum::Int8(_) => DataType::Int8,
225 arrow_type::ArrowTypeEnum::Uint16(_) => DataType::UInt16,
226 arrow_type::ArrowTypeEnum::Int16(_) => DataType::Int16,
227 arrow_type::ArrowTypeEnum::Uint32(_) => DataType::UInt32,
228 arrow_type::ArrowTypeEnum::Int32(_) => DataType::Int32,
229 arrow_type::ArrowTypeEnum::Uint64(_) => DataType::UInt64,
230 arrow_type::ArrowTypeEnum::Int64(_) => DataType::Int64,
231 arrow_type::ArrowTypeEnum::Float16(_) => DataType::Float16,
232 arrow_type::ArrowTypeEnum::Float32(_) => DataType::Float32,
233 arrow_type::ArrowTypeEnum::Float64(_) => DataType::Float64,
234 arrow_type::ArrowTypeEnum::Utf8(_) => DataType::Utf8,
235 arrow_type::ArrowTypeEnum::Utf8View(_) => DataType::Utf8View,
236 arrow_type::ArrowTypeEnum::LargeUtf8(_) => DataType::LargeUtf8,
237 arrow_type::ArrowTypeEnum::Binary(_) => DataType::Binary,
238 arrow_type::ArrowTypeEnum::BinaryView(_) => DataType::BinaryView,
239 arrow_type::ArrowTypeEnum::FixedSizeBinary(size) => {
240 DataType::FixedSizeBinary(*size)
241 }
242 arrow_type::ArrowTypeEnum::LargeBinary(_) => DataType::LargeBinary,
243 arrow_type::ArrowTypeEnum::Date32(_) => DataType::Date32,
244 arrow_type::ArrowTypeEnum::Date64(_) => DataType::Date64,
245 arrow_type::ArrowTypeEnum::Duration(time_unit) => {
246 DataType::Duration(parse_i32_to_time_unit(time_unit)?)
247 }
248 arrow_type::ArrowTypeEnum::Timestamp(protobuf::Timestamp {
249 time_unit,
250 timezone,
251 }) => DataType::Timestamp(
252 parse_i32_to_time_unit(time_unit)?,
253 match timezone.len() {
254 0 => None,
255 _ => Some(timezone.as_str().into()),
256 },
257 ),
258 arrow_type::ArrowTypeEnum::Time32(time_unit) => {
259 DataType::Time32(parse_i32_to_time_unit(time_unit)?)
260 }
261 arrow_type::ArrowTypeEnum::Time64(time_unit) => {
262 DataType::Time64(parse_i32_to_time_unit(time_unit)?)
263 }
264 arrow_type::ArrowTypeEnum::Interval(interval_unit) => {
265 DataType::Interval(parse_i32_to_interval_unit(interval_unit)?)
266 }
267 arrow_type::ArrowTypeEnum::Decimal32(protobuf::Decimal32Type {
268 precision,
269 scale,
270 }) => DataType::Decimal32(*precision as u8, *scale as i8),
271 arrow_type::ArrowTypeEnum::Decimal64(protobuf::Decimal64Type {
272 precision,
273 scale,
274 }) => DataType::Decimal64(*precision as u8, *scale as i8),
275 arrow_type::ArrowTypeEnum::Decimal128(protobuf::Decimal128Type {
276 precision,
277 scale,
278 }) => DataType::Decimal128(*precision as u8, *scale as i8),
279 arrow_type::ArrowTypeEnum::Decimal256(protobuf::Decimal256Type {
280 precision,
281 scale,
282 }) => DataType::Decimal256(*precision as u8, *scale as i8),
283 arrow_type::ArrowTypeEnum::List(list) => {
284 let list_type =
285 list.as_ref().field_type.as_deref().required("field_type")?;
286 DataType::List(Arc::new(list_type))
287 }
288 arrow_type::ArrowTypeEnum::LargeList(list) => {
289 let list_type =
290 list.as_ref().field_type.as_deref().required("field_type")?;
291 DataType::LargeList(Arc::new(list_type))
292 }
293 arrow_type::ArrowTypeEnum::FixedSizeList(list) => {
294 let list_type =
295 list.as_ref().field_type.as_deref().required("field_type")?;
296 let list_size = list.list_size;
297 DataType::FixedSizeList(Arc::new(list_type), list_size)
298 }
299 arrow_type::ArrowTypeEnum::ListView(list) => {
300 let list_type =
301 list.as_ref().field_type.as_deref().required("field_type")?;
302 DataType::ListView(Arc::new(list_type))
303 }
304 arrow_type::ArrowTypeEnum::LargeListView(list) => {
305 let list_type =
306 list.as_ref().field_type.as_deref().required("field_type")?;
307 DataType::LargeListView(Arc::new(list_type))
308 }
309 arrow_type::ArrowTypeEnum::Struct(strct) => DataType::Struct(
310 parse_proto_fields_to_fields(&strct.sub_field_types)?.into(),
311 ),
312 arrow_type::ArrowTypeEnum::Union(union) => {
313 let union_mode = protobuf::UnionMode::try_from(union.union_mode)
314 .map_err(|_| Error::unknown("UnionMode", union.union_mode))?;
315 let union_mode = match union_mode {
316 protobuf::UnionMode::Dense => UnionMode::Dense,
317 protobuf::UnionMode::Sparse => UnionMode::Sparse,
318 };
319 let union_fields = parse_proto_fields_to_fields(&union.union_types)?;
320
321 let union_fields = if union.type_ids.is_empty() {
323 UnionFields::from_fields(union_fields)
324 } else {
325 let type_ids = union.type_ids.iter().map(|i| *i as i8);
326 UnionFields::try_new(type_ids, union_fields).map_err(|e| {
327 DataFusionError::from(e).context("Deserializing Union DataType")
328 })?
329 };
330 DataType::Union(union_fields, union_mode)
331 }
332 arrow_type::ArrowTypeEnum::Dictionary(dict) => {
333 let key_datatype = dict.as_ref().key.as_deref().required("key")?;
334 let value_datatype = dict.as_ref().value.as_deref().required("value")?;
335 DataType::Dictionary(Box::new(key_datatype), Box::new(value_datatype))
336 }
337 arrow_type::ArrowTypeEnum::Map(map) => {
338 let field: Field =
339 map.as_ref().field_type.as_deref().required("field_type")?;
340 let keys_sorted = map.keys_sorted;
341 DataType::Map(Arc::new(field), keys_sorted)
342 }
343 arrow_type::ArrowTypeEnum::RunEndEncoded(run_end_encoded) => {
344 let run_ends_field: Field = run_end_encoded
345 .as_ref()
346 .run_ends_field
347 .as_deref()
348 .required("run_ends_field")?;
349 let value_field: Field = run_end_encoded
350 .as_ref()
351 .values_field
352 .as_deref()
353 .required("values_field")?;
354 DataType::RunEndEncoded(run_ends_field.into(), value_field.into())
355 }
356 })
357 }
358}
359
360impl TryFrom<&protobuf::Field> for Field {
361 type Error = Error;
362 fn try_from(field: &protobuf::Field) -> Result<Self, Self::Error> {
363 let datatype = field.arrow_type.as_deref().required("arrow_type")?;
364 let field = Self::new(field.name.as_str(), datatype, field.nullable)
365 .with_metadata(field.metadata.clone());
366 Ok(field)
367 }
368}
369
370impl TryFrom<&protobuf::Schema> for Schema {
371 type Error = Error;
372
373 fn try_from(
374 schema: &protobuf::Schema,
375 ) -> datafusion_common::Result<Self, Self::Error> {
376 let fields = schema
377 .columns
378 .iter()
379 .map(Field::try_from)
380 .collect::<datafusion_common::Result<Vec<_>, _>>()?;
381 Ok(Self::new_with_metadata(fields, schema.metadata.clone()))
382 }
383}
384
385impl TryFrom<&protobuf::ScalarValue> for ScalarValue {
386 type Error = Error;
387
388 fn try_from(
389 scalar: &protobuf::ScalarValue,
390 ) -> datafusion_common::Result<Self, Self::Error> {
391 use protobuf::scalar_value::Value;
392
393 let value = scalar
394 .value
395 .as_ref()
396 .ok_or_else(|| Error::required("value"))?;
397
398 Ok(match value {
399 Value::BoolValue(v) => Self::Boolean(Some(*v)),
400 Value::Utf8Value(v) => Self::Utf8(Some(v.to_owned())),
401 Value::Utf8ViewValue(v) => Self::Utf8View(Some(v.to_owned())),
402 Value::LargeUtf8Value(v) => Self::LargeUtf8(Some(v.to_owned())),
403 Value::Int8Value(v) => Self::Int8(Some(*v as i8)),
404 Value::Int16Value(v) => Self::Int16(Some(*v as i16)),
405 Value::Int32Value(v) => Self::Int32(Some(*v)),
406 Value::Int64Value(v) => Self::Int64(Some(*v)),
407 Value::Uint8Value(v) => Self::UInt8(Some(*v as u8)),
408 Value::Uint16Value(v) => Self::UInt16(Some(*v as u16)),
409 Value::Uint32Value(v) => Self::UInt32(Some(*v)),
410 Value::Uint64Value(v) => Self::UInt64(Some(*v)),
411 Value::Float32Value(v) => Self::Float32(Some(*v)),
412 Value::Float64Value(v) => Self::Float64(Some(*v)),
413 Value::Date32Value(v) => Self::Date32(Some(*v)),
414 Value::ListValue(v)
416 | Value::FixedSizeListValue(v)
417 | Value::LargeListValue(v)
418 | Value::ListViewValue(v)
419 | Value::LargeListViewValue(v)
420 | Value::StructValue(v)
421 | Value::MapValue(v) => {
422 let protobuf::ScalarNestedValue {
423 ipc_message,
424 arrow_data,
425 dictionaries,
426 schema,
427 } = &v;
428
429 let schema: Schema = if let Some(schema_ref) = schema {
430 schema_ref.try_into()?
431 } else {
432 return Err(Error::General(
433 "Invalid schema while deserializing nested ScalarValue"
434 .to_string(),
435 ));
436 };
437
438 let schema: Schema = {
442 let ipc_gen = IpcDataGenerator {};
443 let write_options = IpcWriteOptions::default();
444 let mut dict_tracker = DictionaryTracker::new(false);
445 let encoded_schema = ipc_gen.schema_to_bytes_with_dictionary_tracker(
446 &schema,
447 &mut dict_tracker,
448 &write_options,
449 );
450 let message =
451 root_as_message(encoded_schema.ipc_message.as_slice()).map_err(
452 |e| {
453 Error::General(format!(
454 "Error IPC schema message while deserializing nested ScalarValue: {e}"
455 ))
456 },
457 )?;
458 let ipc_schema = message.header_as_schema().ok_or_else(|| {
459 Error::General(
460 "Unexpected message type deserializing nested ScalarValue schema"
461 .to_string(),
462 )
463 })?;
464 fb_to_schema(ipc_schema)
465 };
466
467 let message = root_as_message(ipc_message.as_slice()).map_err(|e| {
468 Error::General(format!(
469 "Error IPC message while deserializing nested ScalarValue: {e}"
470 ))
471 })?;
472 let buffer = Buffer::from(arrow_data.as_slice());
473
474 let ipc_batch = message.header_as_record_batch().ok_or_else(|| {
475 Error::General(
476 "Unexpected message type deserializing nested ScalarValue"
477 .to_string(),
478 )
479 })?;
480
481 let mut dict_by_id: HashMap<i64, ArrayRef> = HashMap::new();
482 for protobuf::scalar_nested_value::Dictionary {
483 ipc_message,
484 arrow_data,
485 } in dictionaries
486 {
487 let message = root_as_message(ipc_message.as_slice()).map_err(|e| {
488 Error::General(format!(
489 "Error IPC message while deserializing nested ScalarValue dictionary message: {e}"
490 ))
491 })?;
492 let buffer = Buffer::from(arrow_data.as_slice());
493
494 let dict_batch = message.header_as_dictionary_batch().ok_or_else(|| {
495 Error::General(
496 "Unexpected message type deserializing nested ScalarValue dictionary message"
497 .to_string(),
498 )
499 })?;
500 read_dictionary(
501 &buffer,
502 dict_batch,
503 &schema,
504 &mut dict_by_id,
505 &message.version(),
506 )
507 .map_err(|e| arrow_datafusion_err!(e))
508 .map_err(|e| e.context("Decoding nested ScalarValue dictionary"))?;
509 }
510
511 let record_batch = read_record_batch(
512 &buffer,
513 ipc_batch,
514 Arc::new(schema),
515 &dict_by_id,
516 None,
517 &message.version(),
518 )
519 .map_err(|e| arrow_datafusion_err!(e))
520 .map_err(|e| e.context("Decoding nested ScalarValue value"))?;
521 let arr = record_batch.column(0);
522 match value {
523 Value::ListValue(_) => {
524 Self::List(arr.as_list::<i32>().to_owned().into())
525 }
526 Value::LargeListValue(_) => {
527 Self::LargeList(arr.as_list::<i64>().to_owned().into())
528 }
529 Value::FixedSizeListValue(_) => {
530 Self::FixedSizeList(arr.as_fixed_size_list().to_owned().into())
531 }
532 Value::ListViewValue(_) => {
533 Self::ListView(arr.as_list_view::<i32>().to_owned().into())
534 }
535 Value::LargeListViewValue(_) => {
536 Self::LargeListView(arr.as_list_view::<i64>().to_owned().into())
537 }
538 Value::StructValue(_) => {
539 Self::Struct(arr.as_struct().to_owned().into())
540 }
541 Value::MapValue(_) => Self::Map(arr.as_map().to_owned().into()),
542 _ => unreachable!(),
543 }
544 }
545 Value::NullValue(v) => {
546 let null_type: DataType = v.try_into()?;
547 null_type.try_into().map_err(Error::DataFusionError)?
548 }
549 Value::Decimal32Value(val) => {
550 let array = vec_to_array(val.value.clone());
551 Self::Decimal32(Some(i32::from_be_bytes(array)), val.p as u8, val.s as i8)
552 }
553 Value::Decimal64Value(val) => {
554 let array = vec_to_array(val.value.clone());
555 Self::Decimal64(Some(i64::from_be_bytes(array)), val.p as u8, val.s as i8)
556 }
557 Value::Decimal128Value(val) => {
558 let array = vec_to_array(val.value.clone());
559 Self::Decimal128(
560 Some(i128::from_be_bytes(array)),
561 val.p as u8,
562 val.s as i8,
563 )
564 }
565 Value::Decimal256Value(val) => {
566 let array = vec_to_array(val.value.clone());
567 Self::Decimal256(
568 Some(i256::from_be_bytes(array)),
569 val.p as u8,
570 val.s as i8,
571 )
572 }
573 Value::Date64Value(v) => Self::Date64(Some(*v)),
574 Value::Time32Value(v) => {
575 let time_value =
576 v.value.as_ref().ok_or_else(|| Error::required("value"))?;
577 match time_value {
578 protobuf::scalar_time32_value::Value::Time32SecondValue(t) => {
579 Self::Time32Second(Some(*t))
580 }
581 protobuf::scalar_time32_value::Value::Time32MillisecondValue(t) => {
582 Self::Time32Millisecond(Some(*t))
583 }
584 }
585 }
586 Value::Time64Value(v) => {
587 let time_value =
588 v.value.as_ref().ok_or_else(|| Error::required("value"))?;
589 match time_value {
590 protobuf::scalar_time64_value::Value::Time64MicrosecondValue(t) => {
591 Self::Time64Microsecond(Some(*t))
592 }
593 protobuf::scalar_time64_value::Value::Time64NanosecondValue(t) => {
594 Self::Time64Nanosecond(Some(*t))
595 }
596 }
597 }
598 Value::IntervalYearmonthValue(v) => Self::IntervalYearMonth(Some(*v)),
599 Value::DurationSecondValue(v) => Self::DurationSecond(Some(*v)),
600 Value::DurationMillisecondValue(v) => Self::DurationMillisecond(Some(*v)),
601 Value::DurationMicrosecondValue(v) => Self::DurationMicrosecond(Some(*v)),
602 Value::DurationNanosecondValue(v) => Self::DurationNanosecond(Some(*v)),
603 Value::TimestampValue(v) => {
604 let timezone = if v.timezone.is_empty() {
605 None
606 } else {
607 Some(v.timezone.as_str().into())
608 };
609
610 let ts_value =
611 v.value.as_ref().ok_or_else(|| Error::required("value"))?;
612
613 match ts_value {
614 protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(t) => {
615 Self::TimestampMicrosecond(Some(*t), timezone)
616 }
617 protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(t) => {
618 Self::TimestampNanosecond(Some(*t), timezone)
619 }
620 protobuf::scalar_timestamp_value::Value::TimeSecondValue(t) => {
621 Self::TimestampSecond(Some(*t), timezone)
622 }
623 protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(t) => {
624 Self::TimestampMillisecond(Some(*t), timezone)
625 }
626 }
627 }
628 Value::DictionaryValue(v) => {
629 let index_type: DataType = v
630 .index_type
631 .as_ref()
632 .ok_or_else(|| Error::required("index_type"))?
633 .try_into()?;
634
635 let value: Self = v
636 .value
637 .as_ref()
638 .ok_or_else(|| Error::required("value"))?
639 .as_ref()
640 .try_into()?;
641
642 Self::Dictionary(Box::new(index_type), Box::new(value))
643 }
644 Value::RunEndEncodedValue(v) => {
645 let run_ends_field: Field = v
646 .run_ends_field
647 .as_ref()
648 .ok_or_else(|| Error::required("run_ends_field"))?
649 .try_into()?;
650
651 let values_field: Field = v
652 .values_field
653 .as_ref()
654 .ok_or_else(|| Error::required("values_field"))?
655 .try_into()?;
656
657 let value: Self = v
658 .value
659 .as_ref()
660 .ok_or_else(|| Error::required("value"))?
661 .as_ref()
662 .try_into()?;
663
664 Self::RunEndEncoded(
665 run_ends_field.into(),
666 values_field.into(),
667 Box::new(value),
668 )
669 }
670 Value::BinaryValue(v) => Self::Binary(Some(v.clone())),
671 Value::BinaryViewValue(v) => Self::BinaryView(Some(v.clone())),
672 Value::LargeBinaryValue(v) => Self::LargeBinary(Some(v.clone())),
673 Value::IntervalDaytimeValue(v) => Self::IntervalDayTime(Some(
674 IntervalDayTimeType::make_value(v.days, v.milliseconds),
675 )),
676 Value::IntervalMonthDayNano(v) => Self::IntervalMonthDayNano(Some(
677 IntervalMonthDayNanoType::make_value(v.months, v.days, v.nanos),
678 )),
679 Value::UnionValue(val) => {
680 let mode = match val.mode {
681 0 => UnionMode::Sparse,
682 1 => UnionMode::Dense,
683 id => Err(Error::unknown("UnionMode", id))?,
684 };
685 let ids = val
686 .fields
687 .iter()
688 .map(|f| f.field_id as i8)
689 .collect::<Vec<_>>();
690 let fields = val
691 .fields
692 .iter()
693 .map(|f| f.field.clone())
694 .collect::<Option<Vec<_>>>();
695 let fields = fields.ok_or_else(|| Error::required("UnionField"))?;
696 let fields = parse_proto_fields_to_fields(&fields)?;
697 let union_fields = UnionFields::try_new(ids, fields).map_err(|e| {
698 DataFusionError::from(e).context("Deserializing Union ScalarValue")
699 })?;
700 let v_id = val.value_id as i8;
701 let val = match &val.value {
702 None => None,
703 Some(val) => {
704 let val: ScalarValue = val
705 .as_ref()
706 .try_into()
707 .map_err(|_| Error::General("Invalid Scalar".to_string()))?;
708 Some((v_id, Box::new(val)))
709 }
710 };
711 Self::Union(val, union_fields, mode)
712 }
713 Value::FixedSizeBinaryValue(v) => {
714 Self::FixedSizeBinary(v.length, Some(v.clone().values))
715 }
716 })
717 }
718}
719
720impl From<protobuf::TimeUnit> for TimeUnit {
721 fn from(time_unit: protobuf::TimeUnit) -> Self {
722 match time_unit {
723 protobuf::TimeUnit::Second => TimeUnit::Second,
724 protobuf::TimeUnit::Millisecond => TimeUnit::Millisecond,
725 protobuf::TimeUnit::Microsecond => TimeUnit::Microsecond,
726 protobuf::TimeUnit::Nanosecond => TimeUnit::Nanosecond,
727 }
728 }
729}
730
731impl From<protobuf::IntervalUnit> for IntervalUnit {
732 fn from(interval_unit: protobuf::IntervalUnit) -> Self {
733 match interval_unit {
734 protobuf::IntervalUnit::YearMonth => IntervalUnit::YearMonth,
735 protobuf::IntervalUnit::DayTime => IntervalUnit::DayTime,
736 protobuf::IntervalUnit::MonthDayNano => IntervalUnit::MonthDayNano,
737 }
738 }
739}
740
741impl From<protobuf::Constraints> for Constraints {
742 fn from(constraints: protobuf::Constraints) -> Self {
743 Constraints::new_unverified(
744 constraints
745 .constraints
746 .into_iter()
747 .map(|item| item.into())
748 .collect(),
749 )
750 }
751}
752
753impl From<protobuf::Constraint> for Constraint {
754 fn from(value: protobuf::Constraint) -> Self {
755 match value.constraint_mode.unwrap() {
756 protobuf::constraint::ConstraintMode::PrimaryKey(elem) => {
757 Constraint::PrimaryKey(
758 elem.indices.into_iter().map(|item| item as usize).collect(),
759 )
760 }
761 protobuf::constraint::ConstraintMode::Unique(elem) => Constraint::Unique(
762 elem.indices.into_iter().map(|item| item as usize).collect(),
763 ),
764 }
765 }
766}
767
768impl From<&protobuf::ColumnStats> for ColumnStatistics {
769 fn from(cs: &protobuf::ColumnStats) -> ColumnStatistics {
770 ColumnStatistics {
771 null_count: if let Some(nc) = &cs.null_count {
772 nc.clone().into()
773 } else {
774 Precision::Absent
775 },
776 max_value: if let Some(max) = &cs.max_value {
777 max.clone().into()
778 } else {
779 Precision::Absent
780 },
781 min_value: if let Some(min) = &cs.min_value {
782 min.clone().into()
783 } else {
784 Precision::Absent
785 },
786 sum_value: if let Some(sum) = &cs.sum_value {
787 sum.clone().into()
788 } else {
789 Precision::Absent
790 },
791 distinct_count: if let Some(dc) = &cs.distinct_count {
792 dc.clone().into()
793 } else {
794 Precision::Absent
795 },
796 byte_size: if let Some(sbs) = &cs.byte_size {
797 sbs.clone().into()
798 } else {
799 Precision::Absent
800 },
801 }
802 }
803}
804
805impl From<protobuf::Precision> for Precision<usize> {
806 fn from(s: protobuf::Precision) -> Self {
807 let Ok(precision_type) = s.precision_info.try_into() else {
808 return Precision::Absent;
809 };
810 match precision_type {
811 protobuf::PrecisionInfo::Exact => {
812 if let Some(val) = s.val {
813 if let Ok(ScalarValue::UInt64(Some(val))) =
814 ScalarValue::try_from(&val)
815 {
816 Precision::Exact(val as usize)
817 } else {
818 Precision::Absent
819 }
820 } else {
821 Precision::Absent
822 }
823 }
824 protobuf::PrecisionInfo::Inexact => {
825 if let Some(val) = s.val {
826 if let Ok(ScalarValue::UInt64(Some(val))) =
827 ScalarValue::try_from(&val)
828 {
829 Precision::Inexact(val as usize)
830 } else {
831 Precision::Absent
832 }
833 } else {
834 Precision::Absent
835 }
836 }
837 protobuf::PrecisionInfo::Absent => Precision::Absent,
838 }
839 }
840}
841
842impl From<protobuf::Precision> for Precision<ScalarValue> {
843 fn from(s: protobuf::Precision) -> Self {
844 let Ok(precision_type) = s.precision_info.try_into() else {
845 return Precision::Absent;
846 };
847 match precision_type {
848 protobuf::PrecisionInfo::Exact => {
849 if let Some(val) = s.val {
850 if let Ok(val) = ScalarValue::try_from(&val) {
851 Precision::Exact(val)
852 } else {
853 Precision::Absent
854 }
855 } else {
856 Precision::Absent
857 }
858 }
859 protobuf::PrecisionInfo::Inexact => {
860 if let Some(val) = s.val {
861 if let Ok(val) = ScalarValue::try_from(&val) {
862 Precision::Inexact(val)
863 } else {
864 Precision::Absent
865 }
866 } else {
867 Precision::Absent
868 }
869 }
870 protobuf::PrecisionInfo::Absent => Precision::Absent,
871 }
872 }
873}
874
875impl From<protobuf::JoinSide> for JoinSide {
876 fn from(t: protobuf::JoinSide) -> Self {
877 match t {
878 protobuf::JoinSide::LeftSide => JoinSide::Left,
879 protobuf::JoinSide::RightSide => JoinSide::Right,
880 protobuf::JoinSide::None => JoinSide::None,
881 }
882 }
883}
884
885impl From<&protobuf::Constraint> for Constraint {
886 fn from(value: &protobuf::Constraint) -> Self {
887 match &value.constraint_mode {
888 Some(protobuf::constraint::ConstraintMode::PrimaryKey(elem)) => {
889 Constraint::PrimaryKey(
890 elem.indices.iter().map(|&item| item as usize).collect(),
891 )
892 }
893 Some(protobuf::constraint::ConstraintMode::Unique(elem)) => {
894 Constraint::Unique(
895 elem.indices.iter().map(|&item| item as usize).collect(),
896 )
897 }
898 None => panic!("constraint_mode not set"),
899 }
900 }
901}
902
903impl TryFrom<&protobuf::Constraints> for Constraints {
904 type Error = DataFusionError;
905
906 fn try_from(
907 constraints: &protobuf::Constraints,
908 ) -> datafusion_common::Result<Self, Self::Error> {
909 Ok(Constraints::new_unverified(
910 constraints
911 .constraints
912 .iter()
913 .map(|item| item.into())
914 .collect(),
915 ))
916 }
917}
918
919impl TryFrom<&protobuf::Statistics> for Statistics {
920 type Error = DataFusionError;
921
922 fn try_from(
923 s: &protobuf::Statistics,
924 ) -> datafusion_common::Result<Self, Self::Error> {
925 Ok(Statistics {
927 num_rows: if let Some(nr) = &s.num_rows {
928 nr.clone().into()
929 } else {
930 Precision::Absent
931 },
932 total_byte_size: if let Some(tbs) = &s.total_byte_size {
933 tbs.clone().into()
934 } else {
935 Precision::Absent
936 },
937 column_statistics: s.column_stats.iter().map(|s| s.into()).collect(),
939 })
940 }
941}
942
943impl From<protobuf::CompressionTypeVariant> for CompressionTypeVariant {
944 fn from(value: protobuf::CompressionTypeVariant) -> Self {
945 match value {
946 protobuf::CompressionTypeVariant::Gzip => Self::GZIP,
947 protobuf::CompressionTypeVariant::Bzip2 => Self::BZIP2,
948 protobuf::CompressionTypeVariant::Xz => Self::XZ,
949 protobuf::CompressionTypeVariant::Zstd => Self::ZSTD,
950 protobuf::CompressionTypeVariant::Uncompressed => Self::UNCOMPRESSED,
951 }
952 }
953}
954
955impl From<CompressionTypeVariant> for protobuf::CompressionTypeVariant {
956 fn from(value: CompressionTypeVariant) -> Self {
957 match value {
958 CompressionTypeVariant::GZIP => Self::Gzip,
959 CompressionTypeVariant::BZIP2 => Self::Bzip2,
960 CompressionTypeVariant::XZ => Self::Xz,
961 CompressionTypeVariant::ZSTD => Self::Zstd,
962 CompressionTypeVariant::UNCOMPRESSED => Self::Uncompressed,
963 }
964 }
965}
966
967impl From<protobuf::CsvQuoteStyle> for datafusion_common::parsers::CsvQuoteStyle {
968 fn from(value: protobuf::CsvQuoteStyle) -> Self {
969 match value {
970 protobuf::CsvQuoteStyle::Necessary => Self::Necessary,
971 protobuf::CsvQuoteStyle::Always => Self::Always,
972 protobuf::CsvQuoteStyle::NonNumeric => Self::NonNumeric,
973 protobuf::CsvQuoteStyle::Never => Self::Never,
974 }
975 }
976}
977
978impl TryFrom<&protobuf::CsvWriterOptions> for CsvWriterOptions {
979 type Error = DataFusionError;
980
981 fn try_from(
982 opts: &protobuf::CsvWriterOptions,
983 ) -> datafusion_common::Result<Self, Self::Error> {
984 let write_options = csv_writer_options_from_proto(opts)?;
985 let compression: CompressionTypeVariant = opts.compression().into();
986 Ok(CsvWriterOptions::new(write_options, compression))
987 }
988}
989
990impl TryFrom<&protobuf::JsonWriterOptions> for JsonWriterOptions {
991 type Error = DataFusionError;
992
993 fn try_from(
994 opts: &protobuf::JsonWriterOptions,
995 ) -> datafusion_common::Result<Self, Self::Error> {
996 let compression: CompressionTypeVariant = opts.compression().into();
997 Ok(JsonWriterOptions::new(compression))
998 }
999}
1000
1001impl TryFrom<&protobuf::CsvOptions> for CsvOptions {
1002 type Error = DataFusionError;
1003
1004 fn try_from(
1005 proto_opts: &protobuf::CsvOptions,
1006 ) -> datafusion_common::Result<Self, Self::Error> {
1007 Ok(CsvOptions {
1008 has_header: proto_opts.has_header.first().map(|h| *h != 0),
1009 delimiter: proto_opts.delimiter[0],
1010 quote: proto_opts.quote[0],
1011 terminator: proto_opts.terminator.first().copied(),
1012 escape: proto_opts.escape.first().copied(),
1013 double_quote: proto_opts.double_quote.first().map(|h| *h != 0),
1014 newlines_in_values: proto_opts.newlines_in_values.first().map(|h| *h != 0),
1015 compression: proto_opts.compression().into(),
1016 compression_level: proto_opts.compression_level,
1017 schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize),
1018 date_format: (!proto_opts.date_format.is_empty())
1019 .then(|| proto_opts.date_format.clone()),
1020 datetime_format: (!proto_opts.datetime_format.is_empty())
1021 .then(|| proto_opts.datetime_format.clone()),
1022 timestamp_format: (!proto_opts.timestamp_format.is_empty())
1023 .then(|| proto_opts.timestamp_format.clone()),
1024 timestamp_tz_format: (!proto_opts.timestamp_tz_format.is_empty())
1025 .then(|| proto_opts.timestamp_tz_format.clone()),
1026 time_format: (!proto_opts.time_format.is_empty())
1027 .then(|| proto_opts.time_format.clone()),
1028 null_value: (!proto_opts.null_value.is_empty())
1029 .then(|| proto_opts.null_value.clone()),
1030 null_regex: (!proto_opts.null_regex.is_empty())
1031 .then(|| proto_opts.null_regex.clone()),
1032 comment: proto_opts.comment.first().copied(),
1033 truncated_rows: proto_opts.truncated_rows.first().map(|h| *h != 0),
1034 quote_style: proto_opts.quote_style().into(),
1035 ignore_leading_whitespace: proto_opts
1036 .ignore_leading_whitespace
1037 .first()
1038 .map(|h| *h != 0),
1039 ignore_trailing_whitespace: proto_opts
1040 .ignore_trailing_whitespace
1041 .first()
1042 .map(|h| *h != 0),
1043 })
1044 }
1045}
1046
1047impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
1048 type Error = DataFusionError;
1049
1050 fn try_from(
1051 value: &protobuf::ParquetOptions,
1052 ) -> datafusion_common::Result<Self, Self::Error> {
1053 Ok(ParquetOptions {
1054 enable_page_index: value.enable_page_index,
1055 pruning: value.pruning,
1056 skip_metadata: value.skip_metadata,
1057 metadata_size_hint: value
1058 .metadata_size_hint_opt
1059 .map(|opt| match opt {
1060 protobuf::parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v) => Some(v as usize),
1061 })
1062 .unwrap_or(None),
1063 pushdown_filters: value.pushdown_filters,
1064 reorder_filters: value.reorder_filters,
1065 force_filter_selections: value.force_filter_selections,
1066 data_pagesize_limit: value.data_pagesize_limit as usize,
1067 write_batch_size: value.write_batch_size as usize,
1068 writer_version: value.writer_version.parse().map_err(|e| {
1069 DataFusionError::Internal(format!("Failed to parse writer_version: {e}"))
1070 })?,
1071 compression: value.compression_opt.clone().map(|opt| match opt {
1072 protobuf::parquet_options::CompressionOpt::Compression(v) => Some(v),
1073 }).unwrap_or(None),
1074 dictionary_enabled: value.dictionary_enabled_opt.as_ref().map(|protobuf::parquet_options::DictionaryEnabledOpt::DictionaryEnabled(v)| *v),
1075 dictionary_page_size_limit: value.dictionary_page_size_limit as usize,
1077 statistics_enabled: value
1078 .statistics_enabled_opt.clone()
1079 .map(|opt| match opt {
1080 protobuf::parquet_options::StatisticsEnabledOpt::StatisticsEnabled(v) => Some(v),
1081 })
1082 .unwrap_or(None),
1083 max_row_group_size: value.max_row_group_size as usize,
1084 created_by: value.created_by.clone(),
1085 column_index_truncate_length: value
1086 .column_index_truncate_length_opt.as_ref()
1087 .map(|opt| match opt {
1088 protobuf::parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v) => Some(*v as usize),
1089 })
1090 .unwrap_or(None),
1091 statistics_truncate_length: value
1092 .statistics_truncate_length_opt.as_ref()
1093 .map(|opt| match opt {
1094 protobuf::parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v) => Some(*v as usize),
1095 })
1096 .unwrap_or(None),
1097 data_page_row_count_limit: value.data_page_row_count_limit as usize,
1098 encoding: value
1099 .encoding_opt.clone()
1100 .map(|opt| match opt {
1101 protobuf::parquet_options::EncodingOpt::Encoding(v) => Some(v),
1102 })
1103 .unwrap_or(None),
1104 bloom_filter_on_read: value.bloom_filter_on_read,
1105 bloom_filter_on_write: value.bloom_filter_on_write,
1106 bloom_filter_fpp: value.clone()
1107 .bloom_filter_fpp_opt
1108 .map(|opt| match opt {
1109 protobuf::parquet_options::BloomFilterFppOpt::BloomFilterFpp(v) => Some(v),
1110 })
1111 .unwrap_or(None),
1112 bloom_filter_ndv: value.clone()
1113 .bloom_filter_ndv_opt
1114 .map(|opt| match opt {
1115 protobuf::parquet_options::BloomFilterNdvOpt::BloomFilterNdv(v) => Some(v),
1116 })
1117 .unwrap_or(None),
1118 allow_single_file_parallelism: value.allow_single_file_parallelism,
1119 maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as usize,
1120 maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as usize,
1121 schema_force_view_types: value.schema_force_view_types,
1122 binary_as_string: value.binary_as_string,
1123 coerce_int96: value.coerce_int96_opt.clone().map(|opt| match opt {
1124 protobuf::parquet_options::CoerceInt96Opt::CoerceInt96(v) => Some(v),
1125 }).unwrap_or(None),
1126 coerce_int96_tz: value.coerce_int96_tz_opt.clone().map(|opt| match opt {
1127 protobuf::parquet_options::CoerceInt96TzOpt::CoerceInt96Tz(v) => Some(v),
1128 }).unwrap_or(None),
1129 skip_arrow_metadata: value.skip_arrow_metadata,
1130 max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt {
1131 protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize),
1132 }).unwrap_or(None),
1133 content_defined_chunking: value.content_defined_chunking.map(ParquetCdcOptions::from).unwrap_or_default(),
1134 })
1135 }
1136}
1137
1138impl From<protobuf::ParquetCdcOptions> for ParquetCdcOptions {
1139 fn from(value: protobuf::ParquetCdcOptions) -> Self {
1140 ParquetCdcOptions {
1141 enabled: value.enabled,
1142 min_chunk_size: value.min_chunk_size as usize,
1143 max_chunk_size: value.max_chunk_size as usize,
1144 norm_level: value.norm_level,
1145 }
1146 }
1147}
1148
1149impl TryFrom<&protobuf::ParquetColumnOptions> for ParquetColumnOptions {
1150 type Error = DataFusionError;
1151 fn try_from(
1152 value: &protobuf::ParquetColumnOptions,
1153 ) -> datafusion_common::Result<Self, Self::Error> {
1154 Ok(ParquetColumnOptions {
1155 compression: value.compression_opt.clone().map(|opt| match opt {
1156 protobuf::parquet_column_options::CompressionOpt::Compression(v) => Some(v),
1157 }).unwrap_or(None),
1158 dictionary_enabled: value.dictionary_enabled_opt.as_ref().map(|protobuf::parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(v)| *v),
1159 statistics_enabled: value
1160 .statistics_enabled_opt.clone()
1161 .map(|opt| match opt {
1162 protobuf::parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(v) => Some(v),
1163 })
1164 .unwrap_or(None),
1165 encoding: value
1166 .encoding_opt.clone()
1167 .map(|opt| match opt {
1168 protobuf::parquet_column_options::EncodingOpt::Encoding(v) => Some(v),
1169 })
1170 .unwrap_or(None),
1171 bloom_filter_enabled: value.bloom_filter_enabled_opt.map(|opt| match opt {
1172 protobuf::parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v) => Some(v),
1173 })
1174 .unwrap_or(None),
1175 bloom_filter_fpp: value
1176 .bloom_filter_fpp_opt
1177 .map(|opt| match opt {
1178 protobuf::parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v) => Some(v),
1179 })
1180 .unwrap_or(None),
1181 bloom_filter_ndv: value
1182 .bloom_filter_ndv_opt
1183 .map(|opt| match opt {
1184 protobuf::parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v) => Some(v),
1185 })
1186 .unwrap_or(None),
1187 })
1188 }
1189}
1190
1191impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions {
1192 type Error = DataFusionError;
1193 fn try_from(
1194 value: &protobuf::TableParquetOptions,
1195 ) -> datafusion_common::Result<Self, Self::Error> {
1196 let mut column_specific_options: HashMap<String, ParquetColumnOptions> =
1197 HashMap::new();
1198 for protobuf::ParquetColumnSpecificOptions {
1199 column_name,
1200 options: maybe_options,
1201 } in &value.column_specific_options
1202 {
1203 if let Some(options) = maybe_options {
1204 column_specific_options.insert(column_name.clone(), options.try_into()?);
1205 }
1206 }
1207 let opts = TableParquetOptions {
1208 global: value
1209 .global
1210 .as_ref()
1211 .map(|v| v.try_into())
1212 .unwrap()
1213 .unwrap(),
1214 column_specific_options,
1215 ..Default::default()
1216 };
1217 Ok(opts)
1218 }
1219}
1220
1221impl TryFrom<&protobuf::JsonOptions> for JsonOptions {
1222 type Error = DataFusionError;
1223
1224 fn try_from(
1225 proto_opts: &protobuf::JsonOptions,
1226 ) -> datafusion_common::Result<Self, Self::Error> {
1227 let compression: protobuf::CompressionTypeVariant = proto_opts.compression();
1228 Ok(JsonOptions {
1229 compression: compression.into(),
1230 compression_level: proto_opts.compression_level,
1231 schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize),
1232 newline_delimited: proto_opts.newline_delimited.unwrap_or(true),
1233 })
1234 }
1235}
1236
1237pub fn parse_i32_to_time_unit(value: &i32) -> datafusion_common::Result<TimeUnit, Error> {
1238 protobuf::TimeUnit::try_from(*value)
1239 .map(|t| t.into())
1240 .map_err(|_| Error::unknown("TimeUnit", *value))
1241}
1242
1243pub fn parse_i32_to_interval_unit(
1244 value: &i32,
1245) -> datafusion_common::Result<IntervalUnit, Error> {
1246 protobuf::IntervalUnit::try_from(*value)
1247 .map(|t| t.into())
1248 .map_err(|_| Error::unknown("IntervalUnit", *value))
1249}
1250
1251fn vec_to_array<T, const N: usize>(v: Vec<T>) -> [T; N] {
1253 v.try_into().unwrap_or_else(|v: Vec<T>| {
1254 panic!("Expected a Vec of length {} but it was {}", N, v.len())
1255 })
1256}
1257
1258pub fn parse_proto_fields_to_fields<'a, I>(
1260 fields: I,
1261) -> std::result::Result<Vec<Field>, Error>
1262where
1263 I: IntoIterator<Item = &'a protobuf::Field>,
1264{
1265 fields
1266 .into_iter()
1267 .map(Field::try_from)
1268 .collect::<datafusion_common::Result<_, _>>()
1269}
1270
1271pub(crate) fn csv_writer_options_from_proto(
1272 writer_options: &protobuf::CsvWriterOptions,
1273) -> datafusion_common::Result<WriterBuilder> {
1274 let mut builder = WriterBuilder::new();
1275 if !writer_options.delimiter.is_empty() {
1276 if let Some(delimiter) = writer_options.delimiter.chars().next() {
1277 if delimiter.is_ascii() {
1278 builder = builder.with_delimiter(delimiter as u8);
1279 } else {
1280 return Err(proto_error("CSV Delimiter is not ASCII"));
1281 }
1282 } else {
1283 return Err(proto_error("Error parsing CSV Delimiter"));
1284 }
1285 }
1286 if !writer_options.quote.is_empty() {
1287 if let Some(quote) = writer_options.quote.chars().next() {
1288 if quote.is_ascii() {
1289 builder = builder.with_quote(quote as u8);
1290 } else {
1291 return Err(proto_error("CSV Quote is not ASCII"));
1292 }
1293 } else {
1294 return Err(proto_error("Error parsing CSV Quote"));
1295 }
1296 }
1297 if !writer_options.escape.is_empty() {
1298 if let Some(escape) = writer_options.escape.chars().next() {
1299 if escape.is_ascii() {
1300 builder = builder.with_escape(escape as u8);
1301 } else {
1302 return Err(proto_error("CSV Escape is not ASCII"));
1303 }
1304 } else {
1305 return Err(proto_error("Error parsing CSV Escape"));
1306 }
1307 }
1308 let quote_style = match protobuf::CsvQuoteStyle::try_from(writer_options.quote_style)
1309 {
1310 Ok(protobuf::CsvQuoteStyle::Always) => QuoteStyle::Always,
1311 Ok(protobuf::CsvQuoteStyle::NonNumeric) => QuoteStyle::NonNumeric,
1312 Ok(protobuf::CsvQuoteStyle::Never) => QuoteStyle::Never,
1313 Ok(protobuf::CsvQuoteStyle::Necessary) => QuoteStyle::Necessary,
1314 _ => Err(proto_error(
1315 "Unknown quote style, must be one of: 'Always', 'NonNumeric', 'Never', 'Necessary'",
1316 ))?,
1317 };
1318 Ok(builder
1319 .with_header(writer_options.has_header)
1320 .with_date_format(writer_options.date_format.clone())
1321 .with_datetime_format(writer_options.datetime_format.clone())
1322 .with_timestamp_format(writer_options.timestamp_format.clone())
1323 .with_time_format(writer_options.time_format.clone())
1324 .with_null(writer_options.null_value.clone())
1325 .with_double_quote(writer_options.double_quote)
1326 .with_quote_style(quote_style)
1327 .with_ignore_leading_whitespace(writer_options.ignore_leading_whitespace)
1328 .with_ignore_trailing_whitespace(writer_options.ignore_trailing_whitespace))
1329}
1330
1331#[cfg(test)]
1332mod tests {
1333 use datafusion_common::config::{
1334 ParquetCdcOptions, ParquetOptions, TableParquetOptions,
1335 };
1336
1337 fn parquet_options_proto_round_trip(opts: ParquetOptions) -> ParquetOptions {
1338 let proto: crate::protobuf_common::ParquetOptions =
1339 (&opts).try_into().expect("to_proto");
1340 ParquetOptions::try_from(&proto).expect("from_proto")
1341 }
1342
1343 fn table_parquet_options_proto_round_trip(
1344 opts: TableParquetOptions,
1345 ) -> TableParquetOptions {
1346 let proto: crate::protobuf_common::TableParquetOptions =
1347 (&opts).try_into().expect("to_proto");
1348 TableParquetOptions::try_from(&proto).expect("from_proto")
1349 }
1350
1351 #[test]
1352 fn test_parquet_options_cdc_disabled_round_trip() {
1353 let opts = ParquetOptions::default();
1354 assert!(!opts.content_defined_chunking.enabled);
1355 let recovered = parquet_options_proto_round_trip(opts.clone());
1356 assert_eq!(opts, recovered);
1357 }
1358
1359 #[test]
1360 fn test_parquet_options_coerce_int96_tz_unset_round_trip() {
1361 let opts = ParquetOptions::default();
1362 assert!(opts.coerce_int96_tz.is_none());
1363 let recovered = parquet_options_proto_round_trip(opts.clone());
1364 assert_eq!(recovered.coerce_int96_tz, None);
1365 }
1366
1367 #[test]
1368 fn test_parquet_options_coerce_int96_tz_set_round_trip() {
1369 let opts = ParquetOptions {
1370 coerce_int96: Some("us".to_string()),
1371 coerce_int96_tz: Some("UTC".to_string()),
1372 ..ParquetOptions::default()
1373 };
1374 let recovered = parquet_options_proto_round_trip(opts.clone());
1375 assert_eq!(recovered.coerce_int96, Some("us".to_string()));
1376 assert_eq!(recovered.coerce_int96_tz, Some("UTC".to_string()));
1377 }
1378
1379 #[test]
1380 fn test_table_parquet_options_coerce_int96_tz_round_trip() {
1381 let mut opts = TableParquetOptions::default();
1382 opts.global.coerce_int96 = Some("us".to_string());
1383 opts.global.coerce_int96_tz = Some("America/Los_Angeles".to_string());
1384 let recovered = table_parquet_options_proto_round_trip(opts.clone());
1385 assert_eq!(recovered.global.coerce_int96, Some("us".to_string()));
1386 assert_eq!(
1387 recovered.global.coerce_int96_tz,
1388 Some("America/Los_Angeles".to_string())
1389 );
1390 }
1391
1392 #[test]
1393 fn test_parquet_options_cdc_enabled_round_trip() {
1394 let opts = ParquetOptions {
1395 content_defined_chunking: ParquetCdcOptions {
1396 enabled: true,
1397 min_chunk_size: 128 * 1024,
1398 max_chunk_size: 512 * 1024,
1399 norm_level: 2,
1400 },
1401 ..ParquetOptions::default()
1402 };
1403 let recovered = parquet_options_proto_round_trip(opts.clone());
1404 let cdc = recovered.content_defined_chunking;
1405 assert!(cdc.enabled);
1406 assert_eq!(cdc.min_chunk_size, 128 * 1024);
1407 assert_eq!(cdc.max_chunk_size, 512 * 1024);
1408 assert_eq!(cdc.norm_level, 2);
1409 }
1410
1411 #[test]
1412 fn test_parquet_options_cdc_negative_norm_level_round_trip() {
1413 let opts = ParquetOptions {
1414 content_defined_chunking: ParquetCdcOptions {
1415 enabled: true,
1416 norm_level: -3,
1417 ..ParquetCdcOptions::default()
1418 },
1419 ..ParquetOptions::default()
1420 };
1421 let recovered = parquet_options_proto_round_trip(opts);
1422 assert_eq!(recovered.content_defined_chunking.norm_level, -3);
1423 }
1424
1425 #[test]
1426 fn test_table_parquet_options_cdc_round_trip() {
1427 let mut opts = TableParquetOptions::default();
1428 opts.global.content_defined_chunking = ParquetCdcOptions {
1429 enabled: true,
1430 min_chunk_size: 64 * 1024,
1431 max_chunk_size: 2 * 1024 * 1024,
1432 norm_level: -1,
1433 };
1434
1435 let recovered = table_parquet_options_proto_round_trip(opts.clone());
1436 let cdc = recovered.global.content_defined_chunking;
1437 assert!(cdc.enabled);
1438 assert_eq!(cdc.min_chunk_size, 64 * 1024);
1439 assert_eq!(cdc.max_chunk_size, 2 * 1024 * 1024);
1440 assert_eq!(cdc.norm_level, -1);
1441 }
1442
1443 #[test]
1444 fn test_table_parquet_options_cdc_disabled_round_trip() {
1445 let opts = TableParquetOptions::default();
1446 assert!(!opts.global.content_defined_chunking.enabled);
1447 let recovered = table_parquet_options_proto_round_trip(opts.clone());
1448 assert!(!recovered.global.content_defined_chunking.enabled);
1449 }
1450}