1use std::sync::Arc;
19
20use datafusion::{
21 config::{
22 CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
23 TableParquetOptions,
24 },
25 datasource::file_format::{
26 arrow::ArrowFormatFactory, csv::CsvFormatFactory, json::JsonFormatFactory,
27 parquet::ParquetFormatFactory, FileFormatFactory,
28 },
29 prelude::SessionContext,
30};
31use datafusion_common::{
32 exec_err, not_impl_err, parsers::CompressionTypeVariant, DataFusionError,
33 TableReference,
34};
35use prost::Message;
36
37use crate::protobuf::{
38 parquet_column_options, parquet_options, CsvOptions as CsvOptionsProto,
39 JsonOptions as JsonOptionsProto, ParquetColumnOptions as ParquetColumnOptionsProto,
40 ParquetColumnSpecificOptions, ParquetOptions as ParquetOptionsProto,
41 TableParquetOptions as TableParquetOptionsProto,
42};
43
44use super::LogicalExtensionCodec;
45
46#[derive(Debug)]
47pub struct CsvLogicalExtensionCodec;
48
49impl CsvOptionsProto {
50 fn from_factory(factory: &CsvFormatFactory) -> Self {
51 if let Some(options) = &factory.options {
52 CsvOptionsProto {
53 has_header: options.has_header.map_or(vec![], |v| vec![v as u8]),
54 delimiter: vec![options.delimiter],
55 quote: vec![options.quote],
56 terminator: options.terminator.map_or(vec![], |v| vec![v]),
57 escape: options.escape.map_or(vec![], |v| vec![v]),
58 double_quote: options.double_quote.map_or(vec![], |v| vec![v as u8]),
59 compression: options.compression as i32,
60 schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
61 date_format: options.date_format.clone().unwrap_or_default(),
62 datetime_format: options.datetime_format.clone().unwrap_or_default(),
63 timestamp_format: options.timestamp_format.clone().unwrap_or_default(),
64 timestamp_tz_format: options
65 .timestamp_tz_format
66 .clone()
67 .unwrap_or_default(),
68 time_format: options.time_format.clone().unwrap_or_default(),
69 null_value: options.null_value.clone().unwrap_or_default(),
70 null_regex: options.null_regex.clone().unwrap_or_default(),
71 comment: options.comment.map_or(vec![], |v| vec![v]),
72 newlines_in_values: options
73 .newlines_in_values
74 .map_or(vec![], |v| vec![v as u8]),
75 }
76 } else {
77 CsvOptionsProto::default()
78 }
79 }
80}
81
82impl From<&CsvOptionsProto> for CsvOptions {
83 fn from(proto: &CsvOptionsProto) -> Self {
84 CsvOptions {
85 has_header: if !proto.has_header.is_empty() {
86 Some(proto.has_header[0] != 0)
87 } else {
88 None
89 },
90 delimiter: proto.delimiter.first().copied().unwrap_or(b','),
91 quote: proto.quote.first().copied().unwrap_or(b'"'),
92 terminator: if !proto.terminator.is_empty() {
93 Some(proto.terminator[0])
94 } else {
95 None
96 },
97 escape: if !proto.escape.is_empty() {
98 Some(proto.escape[0])
99 } else {
100 None
101 },
102 double_quote: if !proto.double_quote.is_empty() {
103 Some(proto.double_quote[0] != 0)
104 } else {
105 None
106 },
107 compression: match proto.compression {
108 0 => CompressionTypeVariant::GZIP,
109 1 => CompressionTypeVariant::BZIP2,
110 2 => CompressionTypeVariant::XZ,
111 3 => CompressionTypeVariant::ZSTD,
112 _ => CompressionTypeVariant::UNCOMPRESSED,
113 },
114 schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
115 date_format: if proto.date_format.is_empty() {
116 None
117 } else {
118 Some(proto.date_format.clone())
119 },
120 datetime_format: if proto.datetime_format.is_empty() {
121 None
122 } else {
123 Some(proto.datetime_format.clone())
124 },
125 timestamp_format: if proto.timestamp_format.is_empty() {
126 None
127 } else {
128 Some(proto.timestamp_format.clone())
129 },
130 timestamp_tz_format: if proto.timestamp_tz_format.is_empty() {
131 None
132 } else {
133 Some(proto.timestamp_tz_format.clone())
134 },
135 time_format: if proto.time_format.is_empty() {
136 None
137 } else {
138 Some(proto.time_format.clone())
139 },
140 null_value: if proto.null_value.is_empty() {
141 None
142 } else {
143 Some(proto.null_value.clone())
144 },
145 null_regex: if proto.null_regex.is_empty() {
146 None
147 } else {
148 Some(proto.null_regex.clone())
149 },
150 comment: if !proto.comment.is_empty() {
151 Some(proto.comment[0])
152 } else {
153 None
154 },
155 newlines_in_values: if proto.newlines_in_values.is_empty() {
156 None
157 } else {
158 Some(proto.newlines_in_values[0] != 0)
159 },
160 }
161 }
162}
163
164impl LogicalExtensionCodec for CsvLogicalExtensionCodec {
166 fn try_decode(
167 &self,
168 _buf: &[u8],
169 _inputs: &[datafusion_expr::LogicalPlan],
170 _ctx: &SessionContext,
171 ) -> datafusion_common::Result<datafusion_expr::Extension> {
172 not_impl_err!("Method not implemented")
173 }
174
175 fn try_encode(
176 &self,
177 _node: &datafusion_expr::Extension,
178 _buf: &mut Vec<u8>,
179 ) -> datafusion_common::Result<()> {
180 not_impl_err!("Method not implemented")
181 }
182
183 fn try_decode_table_provider(
184 &self,
185 _buf: &[u8],
186 _table_ref: &TableReference,
187 _schema: arrow::datatypes::SchemaRef,
188 _ctx: &SessionContext,
189 ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
190 not_impl_err!("Method not implemented")
191 }
192
193 fn try_encode_table_provider(
194 &self,
195 _table_ref: &TableReference,
196 _node: Arc<dyn datafusion::datasource::TableProvider>,
197 _buf: &mut Vec<u8>,
198 ) -> datafusion_common::Result<()> {
199 not_impl_err!("Method not implemented")
200 }
201
202 fn try_decode_file_format(
203 &self,
204 buf: &[u8],
205 _ctx: &SessionContext,
206 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
207 let proto = CsvOptionsProto::decode(buf).map_err(|e| {
208 DataFusionError::Execution(format!("Failed to decode CsvOptionsProto: {e:?}"))
209 })?;
210 let options: CsvOptions = (&proto).into();
211 Ok(Arc::new(CsvFormatFactory {
212 options: Some(options),
213 }))
214 }
215
216 fn try_encode_file_format(
217 &self,
218 buf: &mut Vec<u8>,
219 node: Arc<dyn FileFormatFactory>,
220 ) -> datafusion_common::Result<()> {
221 let options =
222 if let Some(csv_factory) = node.as_any().downcast_ref::<CsvFormatFactory>() {
223 csv_factory.options.clone().unwrap_or_default()
224 } else {
225 return exec_err!("{}", "Unsupported FileFormatFactory type".to_string());
226 };
227
228 let proto = CsvOptionsProto::from_factory(&CsvFormatFactory {
229 options: Some(options),
230 });
231
232 proto.encode(buf).map_err(|e| {
233 DataFusionError::Execution(format!("Failed to encode CsvOptions: {e:?}"))
234 })?;
235
236 Ok(())
237 }
238}
239
240impl JsonOptionsProto {
241 fn from_factory(factory: &JsonFormatFactory) -> Self {
242 if let Some(options) = &factory.options {
243 JsonOptionsProto {
244 compression: options.compression as i32,
245 schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
246 }
247 } else {
248 JsonOptionsProto::default()
249 }
250 }
251}
252
253impl From<&JsonOptionsProto> for JsonOptions {
254 fn from(proto: &JsonOptionsProto) -> Self {
255 JsonOptions {
256 compression: match proto.compression {
257 0 => CompressionTypeVariant::GZIP,
258 1 => CompressionTypeVariant::BZIP2,
259 2 => CompressionTypeVariant::XZ,
260 3 => CompressionTypeVariant::ZSTD,
261 _ => CompressionTypeVariant::UNCOMPRESSED,
262 },
263 schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
264 }
265 }
266}
267
268#[derive(Debug)]
269pub struct JsonLogicalExtensionCodec;
270
271impl LogicalExtensionCodec for JsonLogicalExtensionCodec {
273 fn try_decode(
274 &self,
275 _buf: &[u8],
276 _inputs: &[datafusion_expr::LogicalPlan],
277 _ctx: &SessionContext,
278 ) -> datafusion_common::Result<datafusion_expr::Extension> {
279 not_impl_err!("Method not implemented")
280 }
281
282 fn try_encode(
283 &self,
284 _node: &datafusion_expr::Extension,
285 _buf: &mut Vec<u8>,
286 ) -> datafusion_common::Result<()> {
287 not_impl_err!("Method not implemented")
288 }
289
290 fn try_decode_table_provider(
291 &self,
292 _buf: &[u8],
293 _table_ref: &TableReference,
294 _schema: arrow::datatypes::SchemaRef,
295 _ctx: &SessionContext,
296 ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
297 not_impl_err!("Method not implemented")
298 }
299
300 fn try_encode_table_provider(
301 &self,
302 _table_ref: &TableReference,
303 _node: Arc<dyn datafusion::datasource::TableProvider>,
304 _buf: &mut Vec<u8>,
305 ) -> datafusion_common::Result<()> {
306 not_impl_err!("Method not implemented")
307 }
308
309 fn try_decode_file_format(
310 &self,
311 buf: &[u8],
312 _ctx: &SessionContext,
313 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
314 let proto = JsonOptionsProto::decode(buf).map_err(|e| {
315 DataFusionError::Execution(format!(
316 "Failed to decode JsonOptionsProto: {e:?}"
317 ))
318 })?;
319 let options: JsonOptions = (&proto).into();
320 Ok(Arc::new(JsonFormatFactory {
321 options: Some(options),
322 }))
323 }
324
325 fn try_encode_file_format(
326 &self,
327 buf: &mut Vec<u8>,
328 node: Arc<dyn FileFormatFactory>,
329 ) -> datafusion_common::Result<()> {
330 let options = if let Some(json_factory) =
331 node.as_any().downcast_ref::<JsonFormatFactory>()
332 {
333 json_factory.options.clone().unwrap_or_default()
334 } else {
335 return Err(DataFusionError::Execution(
336 "Unsupported FileFormatFactory type".to_string(),
337 ));
338 };
339
340 let proto = JsonOptionsProto::from_factory(&JsonFormatFactory {
341 options: Some(options),
342 });
343
344 proto.encode(buf).map_err(|e| {
345 DataFusionError::Execution(format!("Failed to encode JsonOptions: {e:?}"))
346 })?;
347
348 Ok(())
349 }
350}
351
352impl TableParquetOptionsProto {
353 fn from_factory(factory: &ParquetFormatFactory) -> Self {
354 let global_options = if let Some(ref options) = factory.options {
355 options.clone()
356 } else {
357 return TableParquetOptionsProto::default();
358 };
359
360 let column_specific_options = global_options.column_specific_options;
361 #[allow(deprecated)] TableParquetOptionsProto {
363 global: Some(ParquetOptionsProto {
364 enable_page_index: global_options.global.enable_page_index,
365 pruning: global_options.global.pruning,
366 skip_metadata: global_options.global.skip_metadata,
367 metadata_size_hint_opt: global_options.global.metadata_size_hint.map(|size| {
368 parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size as u64)
369 }),
370 pushdown_filters: global_options.global.pushdown_filters,
371 reorder_filters: global_options.global.reorder_filters,
372 data_pagesize_limit: global_options.global.data_pagesize_limit as u64,
373 write_batch_size: global_options.global.write_batch_size as u64,
374 writer_version: global_options.global.writer_version.clone(),
375 compression_opt: global_options.global.compression.map(|compression| {
376 parquet_options::CompressionOpt::Compression(compression)
377 }),
378 dictionary_enabled_opt: global_options.global.dictionary_enabled.map(|enabled| {
379 parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
380 }),
381 dictionary_page_size_limit: global_options.global.dictionary_page_size_limit as u64,
382 statistics_enabled_opt: global_options.global.statistics_enabled.map(|enabled| {
383 parquet_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
384 }),
385 max_row_group_size: global_options.global.max_row_group_size as u64,
386 created_by: global_options.global.created_by.clone(),
387 column_index_truncate_length_opt: global_options.global.column_index_truncate_length.map(|length| {
388 parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length as u64)
389 }),
390 statistics_truncate_length_opt: global_options.global.statistics_truncate_length.map(|length| {
391 parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length as u64)
392 }),
393 data_page_row_count_limit: global_options.global.data_page_row_count_limit as u64,
394 encoding_opt: global_options.global.encoding.map(|encoding| {
395 parquet_options::EncodingOpt::Encoding(encoding)
396 }),
397 bloom_filter_on_read: global_options.global.bloom_filter_on_read,
398 bloom_filter_on_write: global_options.global.bloom_filter_on_write,
399 bloom_filter_fpp_opt: global_options.global.bloom_filter_fpp.map(|fpp| {
400 parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
401 }),
402 bloom_filter_ndv_opt: global_options.global.bloom_filter_ndv.map(|ndv| {
403 parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
404 }),
405 allow_single_file_parallelism: global_options.global.allow_single_file_parallelism,
406 maximum_parallel_row_group_writers: global_options.global.maximum_parallel_row_group_writers as u64,
407 maximum_buffered_record_batches_per_stream: global_options.global.maximum_buffered_record_batches_per_stream as u64,
408 schema_force_view_types: global_options.global.schema_force_view_types,
409 binary_as_string: global_options.global.binary_as_string,
410 skip_arrow_metadata: global_options.global.skip_arrow_metadata,
411 coerce_int96_opt: global_options.global.coerce_int96.map(|compression| {
412 parquet_options::CoerceInt96Opt::CoerceInt96(compression)
413 }),
414 }),
415 column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| {
416 ParquetColumnSpecificOptions {
417 column_name,
418 options: Some(ParquetColumnOptionsProto {
419 bloom_filter_enabled_opt: options.bloom_filter_enabled.map(|enabled| {
420 parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(enabled)
421 }),
422 encoding_opt: options.encoding.map(|encoding| {
423 parquet_column_options::EncodingOpt::Encoding(encoding)
424 }),
425 dictionary_enabled_opt: options.dictionary_enabled.map(|enabled| {
426 parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
427 }),
428 compression_opt: options.compression.map(|compression| {
429 parquet_column_options::CompressionOpt::Compression(compression)
430 }),
431 statistics_enabled_opt: options.statistics_enabled.map(|enabled| {
432 parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
433 }),
434 bloom_filter_fpp_opt: options.bloom_filter_fpp.map(|fpp| {
435 parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
436 }),
437 bloom_filter_ndv_opt: options.bloom_filter_ndv.map(|ndv| {
438 parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
439 }),
440 })
441 }
442 }).collect(),
443 key_value_metadata: global_options.key_value_metadata
444 .iter()
445 .filter_map(|(key, value)| {
446 value.as_ref().map(|v| (key.clone(), v.clone()))
447 })
448 .collect(),
449 }
450 }
451}
452
453impl From<&ParquetOptionsProto> for ParquetOptions {
454 fn from(proto: &ParquetOptionsProto) -> Self {
455 #[allow(deprecated)] ParquetOptions {
457 enable_page_index: proto.enable_page_index,
458 pruning: proto.pruning,
459 skip_metadata: proto.skip_metadata,
460 metadata_size_hint: proto.metadata_size_hint_opt.as_ref().map(|opt| match opt {
461 parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size) => *size as usize,
462 }),
463 pushdown_filters: proto.pushdown_filters,
464 reorder_filters: proto.reorder_filters,
465 data_pagesize_limit: proto.data_pagesize_limit as usize,
466 write_batch_size: proto.write_batch_size as usize,
467 writer_version: proto.writer_version.clone(),
468 compression: proto.compression_opt.as_ref().map(|opt| match opt {
469 parquet_options::CompressionOpt::Compression(compression) => compression.clone(),
470 }),
471 dictionary_enabled: proto.dictionary_enabled_opt.as_ref().map(|opt| match opt {
472 parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled) => *enabled,
473 }),
474 dictionary_page_size_limit: proto.dictionary_page_size_limit as usize,
475 statistics_enabled: proto.statistics_enabled_opt.as_ref().map(|opt| match opt {
476 parquet_options::StatisticsEnabledOpt::StatisticsEnabled(statistics) => statistics.clone(),
477 }),
478 max_row_group_size: proto.max_row_group_size as usize,
479 created_by: proto.created_by.clone(),
480 column_index_truncate_length: proto.column_index_truncate_length_opt.as_ref().map(|opt| match opt {
481 parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length) => *length as usize,
482 }),
483 statistics_truncate_length: proto.statistics_truncate_length_opt.as_ref().map(|opt| match opt {
484 parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length) => *length as usize,
485 }),
486 data_page_row_count_limit: proto.data_page_row_count_limit as usize,
487 encoding: proto.encoding_opt.as_ref().map(|opt| match opt {
488 parquet_options::EncodingOpt::Encoding(encoding) => encoding.clone(),
489 }),
490 bloom_filter_on_read: proto.bloom_filter_on_read,
491 bloom_filter_on_write: proto.bloom_filter_on_write,
492 bloom_filter_fpp: proto.bloom_filter_fpp_opt.as_ref().map(|opt| match opt {
493 parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp) => *fpp,
494 }),
495 bloom_filter_ndv: proto.bloom_filter_ndv_opt.as_ref().map(|opt| match opt {
496 parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv) => *ndv,
497 }),
498 allow_single_file_parallelism: proto.allow_single_file_parallelism,
499 maximum_parallel_row_group_writers: proto.maximum_parallel_row_group_writers as usize,
500 maximum_buffered_record_batches_per_stream: proto.maximum_buffered_record_batches_per_stream as usize,
501 schema_force_view_types: proto.schema_force_view_types,
502 binary_as_string: proto.binary_as_string,
503 skip_arrow_metadata: proto.skip_arrow_metadata,
504 coerce_int96: proto.coerce_int96_opt.as_ref().map(|opt| match opt {
505 parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => coerce_int96.clone(),
506 }),
507 }
508 }
509}
510
511impl From<ParquetColumnOptionsProto> for ParquetColumnOptions {
512 fn from(proto: ParquetColumnOptionsProto) -> Self {
513 #[allow(deprecated)] ParquetColumnOptions {
515 bloom_filter_enabled: proto.bloom_filter_enabled_opt.map(
516 |parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v)| v,
517 ),
518 encoding: proto
519 .encoding_opt
520 .map(|parquet_column_options::EncodingOpt::Encoding(v)| v),
521 dictionary_enabled: proto.dictionary_enabled_opt.map(
522 |parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(v)| v,
523 ),
524 compression: proto
525 .compression_opt
526 .map(|parquet_column_options::CompressionOpt::Compression(v)| v),
527 statistics_enabled: proto.statistics_enabled_opt.map(
528 |parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(v)| v,
529 ),
530 bloom_filter_fpp: proto
531 .bloom_filter_fpp_opt
532 .map(|parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v)| v),
533 bloom_filter_ndv: proto
534 .bloom_filter_ndv_opt
535 .map(|parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v)| v),
536 }
537 }
538}
539
540impl From<&TableParquetOptionsProto> for TableParquetOptions {
541 fn from(proto: &TableParquetOptionsProto) -> Self {
542 TableParquetOptions {
543 global: proto
544 .global
545 .as_ref()
546 .map(ParquetOptions::from)
547 .unwrap_or_default(),
548 column_specific_options: proto
549 .column_specific_options
550 .iter()
551 .map(|parquet_column_options| {
552 (
553 parquet_column_options.column_name.clone(),
554 ParquetColumnOptions::from(
555 parquet_column_options.options.clone().unwrap_or_default(),
556 ),
557 )
558 })
559 .collect(),
560 key_value_metadata: proto
561 .key_value_metadata
562 .iter()
563 .map(|(k, v)| (k.clone(), Some(v.clone())))
564 .collect(),
565 crypto: Default::default(),
566 }
567 }
568}
569
570#[derive(Debug)]
571pub struct ParquetLogicalExtensionCodec;
572
573impl LogicalExtensionCodec for ParquetLogicalExtensionCodec {
575 fn try_decode(
576 &self,
577 _buf: &[u8],
578 _inputs: &[datafusion_expr::LogicalPlan],
579 _ctx: &SessionContext,
580 ) -> datafusion_common::Result<datafusion_expr::Extension> {
581 not_impl_err!("Method not implemented")
582 }
583
584 fn try_encode(
585 &self,
586 _node: &datafusion_expr::Extension,
587 _buf: &mut Vec<u8>,
588 ) -> datafusion_common::Result<()> {
589 not_impl_err!("Method not implemented")
590 }
591
592 fn try_decode_table_provider(
593 &self,
594 _buf: &[u8],
595 _table_ref: &TableReference,
596 _schema: arrow::datatypes::SchemaRef,
597 _ctx: &SessionContext,
598 ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
599 not_impl_err!("Method not implemented")
600 }
601
602 fn try_encode_table_provider(
603 &self,
604 _table_ref: &TableReference,
605 _node: Arc<dyn datafusion::datasource::TableProvider>,
606 _buf: &mut Vec<u8>,
607 ) -> datafusion_common::Result<()> {
608 not_impl_err!("Method not implemented")
609 }
610
611 fn try_decode_file_format(
612 &self,
613 buf: &[u8],
614 _ctx: &SessionContext,
615 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
616 let proto = TableParquetOptionsProto::decode(buf).map_err(|e| {
617 DataFusionError::Execution(format!(
618 "Failed to decode TableParquetOptionsProto: {e:?}"
619 ))
620 })?;
621 let options: TableParquetOptions = (&proto).into();
622 Ok(Arc::new(ParquetFormatFactory {
623 options: Some(options),
624 }))
625 }
626
627 fn try_encode_file_format(
628 &self,
629 buf: &mut Vec<u8>,
630 node: Arc<dyn FileFormatFactory>,
631 ) -> datafusion_common::Result<()> {
632 let options = if let Some(parquet_factory) =
633 node.as_any().downcast_ref::<ParquetFormatFactory>()
634 {
635 parquet_factory.options.clone().unwrap_or_default()
636 } else {
637 return Err(DataFusionError::Execution(
638 "Unsupported FileFormatFactory type".to_string(),
639 ));
640 };
641
642 let proto = TableParquetOptionsProto::from_factory(&ParquetFormatFactory {
643 options: Some(options),
644 });
645
646 proto.encode(buf).map_err(|e| {
647 DataFusionError::Execution(format!(
648 "Failed to encode TableParquetOptionsProto: {e:?}"
649 ))
650 })?;
651
652 Ok(())
653 }
654}
655
656#[derive(Debug)]
657pub struct ArrowLogicalExtensionCodec;
658
659impl LogicalExtensionCodec for ArrowLogicalExtensionCodec {
661 fn try_decode(
662 &self,
663 _buf: &[u8],
664 _inputs: &[datafusion_expr::LogicalPlan],
665 _ctx: &SessionContext,
666 ) -> datafusion_common::Result<datafusion_expr::Extension> {
667 not_impl_err!("Method not implemented")
668 }
669
670 fn try_encode(
671 &self,
672 _node: &datafusion_expr::Extension,
673 _buf: &mut Vec<u8>,
674 ) -> datafusion_common::Result<()> {
675 not_impl_err!("Method not implemented")
676 }
677
678 fn try_decode_table_provider(
679 &self,
680 _buf: &[u8],
681 _table_ref: &TableReference,
682 _schema: arrow::datatypes::SchemaRef,
683 _ctx: &SessionContext,
684 ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
685 not_impl_err!("Method not implemented")
686 }
687
688 fn try_encode_table_provider(
689 &self,
690 _table_ref: &TableReference,
691 _node: Arc<dyn datafusion::datasource::TableProvider>,
692 _buf: &mut Vec<u8>,
693 ) -> datafusion_common::Result<()> {
694 not_impl_err!("Method not implemented")
695 }
696
697 fn try_decode_file_format(
698 &self,
699 __buf: &[u8],
700 __ctx: &SessionContext,
701 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
702 Ok(Arc::new(ArrowFormatFactory::new()))
703 }
704
705 fn try_encode_file_format(
706 &self,
707 __buf: &mut Vec<u8>,
708 __node: Arc<dyn FileFormatFactory>,
709 ) -> datafusion_common::Result<()> {
710 Ok(())
711 }
712}
713
714#[derive(Debug)]
715pub struct AvroLogicalExtensionCodec;
716
717impl LogicalExtensionCodec for AvroLogicalExtensionCodec {
719 fn try_decode(
720 &self,
721 _buf: &[u8],
722 _inputs: &[datafusion_expr::LogicalPlan],
723 _ctx: &SessionContext,
724 ) -> datafusion_common::Result<datafusion_expr::Extension> {
725 not_impl_err!("Method not implemented")
726 }
727
728 fn try_encode(
729 &self,
730 _node: &datafusion_expr::Extension,
731 _buf: &mut Vec<u8>,
732 ) -> datafusion_common::Result<()> {
733 not_impl_err!("Method not implemented")
734 }
735
736 fn try_decode_table_provider(
737 &self,
738 _buf: &[u8],
739 _table_ref: &TableReference,
740 _schema: arrow::datatypes::SchemaRef,
741 _cts: &SessionContext,
742 ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
743 not_impl_err!("Method not implemented")
744 }
745
746 fn try_encode_table_provider(
747 &self,
748 _table_ref: &TableReference,
749 _node: Arc<dyn datafusion::datasource::TableProvider>,
750 _buf: &mut Vec<u8>,
751 ) -> datafusion_common::Result<()> {
752 not_impl_err!("Method not implemented")
753 }
754
755 fn try_decode_file_format(
756 &self,
757 __buf: &[u8],
758 __ctx: &SessionContext,
759 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
760 Ok(Arc::new(ArrowFormatFactory::new()))
761 }
762
763 fn try_encode_file_format(
764 &self,
765 __buf: &mut Vec<u8>,
766 __node: Arc<dyn FileFormatFactory>,
767 ) -> datafusion_common::Result<()> {
768 Ok(())
769 }
770}