1use std::sync::Arc;
19
20use datafusion::{
21 config::{
22 CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
23 TableParquetOptions,
24 },
25 datasource::file_format::{
26 arrow::ArrowFormatFactory, csv::CsvFormatFactory, json::JsonFormatFactory,
27 parquet::ParquetFormatFactory, FileFormatFactory,
28 },
29 prelude::SessionContext,
30};
31use datafusion_common::{
32 exec_err, not_impl_err, parsers::CompressionTypeVariant, DataFusionError,
33 TableReference,
34};
35use prost::Message;
36
37use crate::protobuf::{
38 parquet_column_options, parquet_options, CsvOptions as CsvOptionsProto,
39 JsonOptions as JsonOptionsProto, ParquetColumnOptions as ParquetColumnOptionsProto,
40 ParquetColumnSpecificOptions, ParquetOptions as ParquetOptionsProto,
41 TableParquetOptions as TableParquetOptionsProto,
42};
43
44use super::LogicalExtensionCodec;
45
46#[derive(Debug)]
47pub struct CsvLogicalExtensionCodec;
48
49impl CsvOptionsProto {
50 fn from_factory(factory: &CsvFormatFactory) -> Self {
51 if let Some(options) = &factory.options {
52 CsvOptionsProto {
53 has_header: options.has_header.map_or(vec![], |v| vec![v as u8]),
54 delimiter: vec![options.delimiter],
55 quote: vec![options.quote],
56 terminator: options.terminator.map_or(vec![], |v| vec![v]),
57 escape: options.escape.map_or(vec![], |v| vec![v]),
58 double_quote: options.double_quote.map_or(vec![], |v| vec![v as u8]),
59 compression: options.compression as i32,
60 schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
61 date_format: options.date_format.clone().unwrap_or_default(),
62 datetime_format: options.datetime_format.clone().unwrap_or_default(),
63 timestamp_format: options.timestamp_format.clone().unwrap_or_default(),
64 timestamp_tz_format: options
65 .timestamp_tz_format
66 .clone()
67 .unwrap_or_default(),
68 time_format: options.time_format.clone().unwrap_or_default(),
69 null_value: options.null_value.clone().unwrap_or_default(),
70 null_regex: options.null_regex.clone().unwrap_or_default(),
71 comment: options.comment.map_or(vec![], |v| vec![v]),
72 newlines_in_values: options
73 .newlines_in_values
74 .map_or(vec![], |v| vec![v as u8]),
75 }
76 } else {
77 CsvOptionsProto::default()
78 }
79 }
80}
81
82impl From<&CsvOptionsProto> for CsvOptions {
83 fn from(proto: &CsvOptionsProto) -> Self {
84 CsvOptions {
85 has_header: if !proto.has_header.is_empty() {
86 Some(proto.has_header[0] != 0)
87 } else {
88 None
89 },
90 delimiter: proto.delimiter.first().copied().unwrap_or(b','),
91 quote: proto.quote.first().copied().unwrap_or(b'"'),
92 terminator: if !proto.terminator.is_empty() {
93 Some(proto.terminator[0])
94 } else {
95 None
96 },
97 escape: if !proto.escape.is_empty() {
98 Some(proto.escape[0])
99 } else {
100 None
101 },
102 double_quote: if !proto.double_quote.is_empty() {
103 Some(proto.double_quote[0] != 0)
104 } else {
105 None
106 },
107 compression: match proto.compression {
108 0 => CompressionTypeVariant::GZIP,
109 1 => CompressionTypeVariant::BZIP2,
110 2 => CompressionTypeVariant::XZ,
111 3 => CompressionTypeVariant::ZSTD,
112 _ => CompressionTypeVariant::UNCOMPRESSED,
113 },
114 schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
115 date_format: if proto.date_format.is_empty() {
116 None
117 } else {
118 Some(proto.date_format.clone())
119 },
120 datetime_format: if proto.datetime_format.is_empty() {
121 None
122 } else {
123 Some(proto.datetime_format.clone())
124 },
125 timestamp_format: if proto.timestamp_format.is_empty() {
126 None
127 } else {
128 Some(proto.timestamp_format.clone())
129 },
130 timestamp_tz_format: if proto.timestamp_tz_format.is_empty() {
131 None
132 } else {
133 Some(proto.timestamp_tz_format.clone())
134 },
135 time_format: if proto.time_format.is_empty() {
136 None
137 } else {
138 Some(proto.time_format.clone())
139 },
140 null_value: if proto.null_value.is_empty() {
141 None
142 } else {
143 Some(proto.null_value.clone())
144 },
145 null_regex: if proto.null_regex.is_empty() {
146 None
147 } else {
148 Some(proto.null_regex.clone())
149 },
150 comment: if !proto.comment.is_empty() {
151 Some(proto.comment[0])
152 } else {
153 None
154 },
155 newlines_in_values: if proto.newlines_in_values.is_empty() {
156 None
157 } else {
158 Some(proto.newlines_in_values[0] != 0)
159 },
160 }
161 }
162}
163
164impl LogicalExtensionCodec for CsvLogicalExtensionCodec {
166 fn try_decode(
167 &self,
168 _buf: &[u8],
169 _inputs: &[datafusion_expr::LogicalPlan],
170 _ctx: &SessionContext,
171 ) -> datafusion_common::Result<datafusion_expr::Extension> {
172 not_impl_err!("Method not implemented")
173 }
174
175 fn try_encode(
176 &self,
177 _node: &datafusion_expr::Extension,
178 _buf: &mut Vec<u8>,
179 ) -> datafusion_common::Result<()> {
180 not_impl_err!("Method not implemented")
181 }
182
183 fn try_decode_table_provider(
184 &self,
185 _buf: &[u8],
186 _table_ref: &TableReference,
187 _schema: arrow::datatypes::SchemaRef,
188 _ctx: &SessionContext,
189 ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
190 not_impl_err!("Method not implemented")
191 }
192
193 fn try_encode_table_provider(
194 &self,
195 _table_ref: &TableReference,
196 _node: Arc<dyn datafusion::datasource::TableProvider>,
197 _buf: &mut Vec<u8>,
198 ) -> datafusion_common::Result<()> {
199 not_impl_err!("Method not implemented")
200 }
201
202 fn try_decode_file_format(
203 &self,
204 buf: &[u8],
205 _ctx: &SessionContext,
206 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
207 let proto = CsvOptionsProto::decode(buf).map_err(|e| {
208 DataFusionError::Execution(format!(
209 "Failed to decode CsvOptionsProto: {:?}",
210 e
211 ))
212 })?;
213 let options: CsvOptions = (&proto).into();
214 Ok(Arc::new(CsvFormatFactory {
215 options: Some(options),
216 }))
217 }
218
219 fn try_encode_file_format(
220 &self,
221 buf: &mut Vec<u8>,
222 node: Arc<dyn FileFormatFactory>,
223 ) -> datafusion_common::Result<()> {
224 let options =
225 if let Some(csv_factory) = node.as_any().downcast_ref::<CsvFormatFactory>() {
226 csv_factory.options.clone().unwrap_or_default()
227 } else {
228 return exec_err!("{}", "Unsupported FileFormatFactory type".to_string());
229 };
230
231 let proto = CsvOptionsProto::from_factory(&CsvFormatFactory {
232 options: Some(options),
233 });
234
235 proto.encode(buf).map_err(|e| {
236 DataFusionError::Execution(format!("Failed to encode CsvOptions: {:?}", e))
237 })?;
238
239 Ok(())
240 }
241}
242
243impl JsonOptionsProto {
244 fn from_factory(factory: &JsonFormatFactory) -> Self {
245 if let Some(options) = &factory.options {
246 JsonOptionsProto {
247 compression: options.compression as i32,
248 schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
249 }
250 } else {
251 JsonOptionsProto::default()
252 }
253 }
254}
255
256impl From<&JsonOptionsProto> for JsonOptions {
257 fn from(proto: &JsonOptionsProto) -> Self {
258 JsonOptions {
259 compression: match proto.compression {
260 0 => CompressionTypeVariant::GZIP,
261 1 => CompressionTypeVariant::BZIP2,
262 2 => CompressionTypeVariant::XZ,
263 3 => CompressionTypeVariant::ZSTD,
264 _ => CompressionTypeVariant::UNCOMPRESSED,
265 },
266 schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
267 }
268 }
269}
270
271#[derive(Debug)]
272pub struct JsonLogicalExtensionCodec;
273
274impl LogicalExtensionCodec for JsonLogicalExtensionCodec {
276 fn try_decode(
277 &self,
278 _buf: &[u8],
279 _inputs: &[datafusion_expr::LogicalPlan],
280 _ctx: &SessionContext,
281 ) -> datafusion_common::Result<datafusion_expr::Extension> {
282 not_impl_err!("Method not implemented")
283 }
284
285 fn try_encode(
286 &self,
287 _node: &datafusion_expr::Extension,
288 _buf: &mut Vec<u8>,
289 ) -> datafusion_common::Result<()> {
290 not_impl_err!("Method not implemented")
291 }
292
293 fn try_decode_table_provider(
294 &self,
295 _buf: &[u8],
296 _table_ref: &TableReference,
297 _schema: arrow::datatypes::SchemaRef,
298 _ctx: &SessionContext,
299 ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
300 not_impl_err!("Method not implemented")
301 }
302
303 fn try_encode_table_provider(
304 &self,
305 _table_ref: &TableReference,
306 _node: Arc<dyn datafusion::datasource::TableProvider>,
307 _buf: &mut Vec<u8>,
308 ) -> datafusion_common::Result<()> {
309 not_impl_err!("Method not implemented")
310 }
311
312 fn try_decode_file_format(
313 &self,
314 buf: &[u8],
315 _ctx: &SessionContext,
316 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
317 let proto = JsonOptionsProto::decode(buf).map_err(|e| {
318 DataFusionError::Execution(format!(
319 "Failed to decode JsonOptionsProto: {:?}",
320 e
321 ))
322 })?;
323 let options: JsonOptions = (&proto).into();
324 Ok(Arc::new(JsonFormatFactory {
325 options: Some(options),
326 }))
327 }
328
329 fn try_encode_file_format(
330 &self,
331 buf: &mut Vec<u8>,
332 node: Arc<dyn FileFormatFactory>,
333 ) -> datafusion_common::Result<()> {
334 let options = if let Some(json_factory) =
335 node.as_any().downcast_ref::<JsonFormatFactory>()
336 {
337 json_factory.options.clone().unwrap_or_default()
338 } else {
339 return Err(DataFusionError::Execution(
340 "Unsupported FileFormatFactory type".to_string(),
341 ));
342 };
343
344 let proto = JsonOptionsProto::from_factory(&JsonFormatFactory {
345 options: Some(options),
346 });
347
348 proto.encode(buf).map_err(|e| {
349 DataFusionError::Execution(format!("Failed to encode JsonOptions: {:?}", e))
350 })?;
351
352 Ok(())
353 }
354}
355
356impl TableParquetOptionsProto {
357 fn from_factory(factory: &ParquetFormatFactory) -> Self {
358 let global_options = if let Some(ref options) = factory.options {
359 options.clone()
360 } else {
361 return TableParquetOptionsProto::default();
362 };
363
364 let column_specific_options = global_options.column_specific_options;
365 #[allow(deprecated)] TableParquetOptionsProto {
367 global: Some(ParquetOptionsProto {
368 enable_page_index: global_options.global.enable_page_index,
369 pruning: global_options.global.pruning,
370 skip_metadata: global_options.global.skip_metadata,
371 metadata_size_hint_opt: global_options.global.metadata_size_hint.map(|size| {
372 parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size as u64)
373 }),
374 pushdown_filters: global_options.global.pushdown_filters,
375 reorder_filters: global_options.global.reorder_filters,
376 data_pagesize_limit: global_options.global.data_pagesize_limit as u64,
377 write_batch_size: global_options.global.write_batch_size as u64,
378 writer_version: global_options.global.writer_version.clone(),
379 compression_opt: global_options.global.compression.map(|compression| {
380 parquet_options::CompressionOpt::Compression(compression)
381 }),
382 dictionary_enabled_opt: global_options.global.dictionary_enabled.map(|enabled| {
383 parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
384 }),
385 dictionary_page_size_limit: global_options.global.dictionary_page_size_limit as u64,
386 statistics_enabled_opt: global_options.global.statistics_enabled.map(|enabled| {
387 parquet_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
388 }),
389 max_statistics_size_opt: global_options.global.max_statistics_size.map(|size| {
390 parquet_options::MaxStatisticsSizeOpt::MaxStatisticsSize(size as u64)
391 }),
392 max_row_group_size: global_options.global.max_row_group_size as u64,
393 created_by: global_options.global.created_by.clone(),
394 column_index_truncate_length_opt: global_options.global.column_index_truncate_length.map(|length| {
395 parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length as u64)
396 }),
397 statistics_truncate_length_opt: global_options.global.statistics_truncate_length.map(|length| {
398 parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length as u64)
399 }),
400 data_page_row_count_limit: global_options.global.data_page_row_count_limit as u64,
401 encoding_opt: global_options.global.encoding.map(|encoding| {
402 parquet_options::EncodingOpt::Encoding(encoding)
403 }),
404 bloom_filter_on_read: global_options.global.bloom_filter_on_read,
405 bloom_filter_on_write: global_options.global.bloom_filter_on_write,
406 bloom_filter_fpp_opt: global_options.global.bloom_filter_fpp.map(|fpp| {
407 parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
408 }),
409 bloom_filter_ndv_opt: global_options.global.bloom_filter_ndv.map(|ndv| {
410 parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
411 }),
412 allow_single_file_parallelism: global_options.global.allow_single_file_parallelism,
413 maximum_parallel_row_group_writers: global_options.global.maximum_parallel_row_group_writers as u64,
414 maximum_buffered_record_batches_per_stream: global_options.global.maximum_buffered_record_batches_per_stream as u64,
415 schema_force_view_types: global_options.global.schema_force_view_types,
416 binary_as_string: global_options.global.binary_as_string,
417 skip_arrow_metadata: global_options.global.skip_arrow_metadata,
418 }),
419 column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| {
420 ParquetColumnSpecificOptions {
421 column_name,
422 options: Some(ParquetColumnOptionsProto {
423 bloom_filter_enabled_opt: options.bloom_filter_enabled.map(|enabled| {
424 parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(enabled)
425 }),
426 encoding_opt: options.encoding.map(|encoding| {
427 parquet_column_options::EncodingOpt::Encoding(encoding)
428 }),
429 dictionary_enabled_opt: options.dictionary_enabled.map(|enabled| {
430 parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
431 }),
432 compression_opt: options.compression.map(|compression| {
433 parquet_column_options::CompressionOpt::Compression(compression)
434 }),
435 statistics_enabled_opt: options.statistics_enabled.map(|enabled| {
436 parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
437 }),
438 bloom_filter_fpp_opt: options.bloom_filter_fpp.map(|fpp| {
439 parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
440 }),
441 bloom_filter_ndv_opt: options.bloom_filter_ndv.map(|ndv| {
442 parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
443 }),
444 max_statistics_size_opt: options.max_statistics_size.map(|size| {
445 parquet_column_options::MaxStatisticsSizeOpt::MaxStatisticsSize(size as u32)
446 }),
447 })
448 }
449 }).collect(),
450 key_value_metadata: global_options.key_value_metadata
451 .iter()
452 .filter_map(|(key, value)| {
453 value.as_ref().map(|v| (key.clone(), v.clone()))
454 })
455 .collect(),
456 }
457 }
458}
459
460impl From<&ParquetOptionsProto> for ParquetOptions {
461 fn from(proto: &ParquetOptionsProto) -> Self {
462 #[allow(deprecated)] ParquetOptions {
464 enable_page_index: proto.enable_page_index,
465 pruning: proto.pruning,
466 skip_metadata: proto.skip_metadata,
467 metadata_size_hint: proto.metadata_size_hint_opt.as_ref().map(|opt| match opt {
468 parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size) => *size as usize,
469 }),
470 pushdown_filters: proto.pushdown_filters,
471 reorder_filters: proto.reorder_filters,
472 data_pagesize_limit: proto.data_pagesize_limit as usize,
473 write_batch_size: proto.write_batch_size as usize,
474 writer_version: proto.writer_version.clone(),
475 compression: proto.compression_opt.as_ref().map(|opt| match opt {
476 parquet_options::CompressionOpt::Compression(compression) => compression.clone(),
477 }),
478 dictionary_enabled: proto.dictionary_enabled_opt.as_ref().map(|opt| match opt {
479 parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled) => *enabled,
480 }),
481 dictionary_page_size_limit: proto.dictionary_page_size_limit as usize,
482 statistics_enabled: proto.statistics_enabled_opt.as_ref().map(|opt| match opt {
483 parquet_options::StatisticsEnabledOpt::StatisticsEnabled(statistics) => statistics.clone(),
484 }),
485 max_statistics_size: proto.max_statistics_size_opt.as_ref().map(|opt| match opt {
486 parquet_options::MaxStatisticsSizeOpt::MaxStatisticsSize(size) => *size as usize,
487 }),
488 max_row_group_size: proto.max_row_group_size as usize,
489 created_by: proto.created_by.clone(),
490 column_index_truncate_length: proto.column_index_truncate_length_opt.as_ref().map(|opt| match opt {
491 parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length) => *length as usize,
492 }),
493 statistics_truncate_length: proto.statistics_truncate_length_opt.as_ref().map(|opt| match opt {
494 parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length) => *length as usize,
495 }),
496 data_page_row_count_limit: proto.data_page_row_count_limit as usize,
497 encoding: proto.encoding_opt.as_ref().map(|opt| match opt {
498 parquet_options::EncodingOpt::Encoding(encoding) => encoding.clone(),
499 }),
500 bloom_filter_on_read: proto.bloom_filter_on_read,
501 bloom_filter_on_write: proto.bloom_filter_on_write,
502 bloom_filter_fpp: proto.bloom_filter_fpp_opt.as_ref().map(|opt| match opt {
503 parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp) => *fpp,
504 }),
505 bloom_filter_ndv: proto.bloom_filter_ndv_opt.as_ref().map(|opt| match opt {
506 parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv) => *ndv,
507 }),
508 allow_single_file_parallelism: proto.allow_single_file_parallelism,
509 maximum_parallel_row_group_writers: proto.maximum_parallel_row_group_writers as usize,
510 maximum_buffered_record_batches_per_stream: proto.maximum_buffered_record_batches_per_stream as usize,
511 schema_force_view_types: proto.schema_force_view_types,
512 binary_as_string: proto.binary_as_string,
513 skip_arrow_metadata: proto.skip_arrow_metadata,
514 }
515 }
516}
517
518impl From<ParquetColumnOptionsProto> for ParquetColumnOptions {
519 fn from(proto: ParquetColumnOptionsProto) -> Self {
520 #[allow(deprecated)] ParquetColumnOptions {
522 bloom_filter_enabled: proto.bloom_filter_enabled_opt.map(
523 |parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v)| v,
524 ),
525 encoding: proto
526 .encoding_opt
527 .map(|parquet_column_options::EncodingOpt::Encoding(v)| v),
528 dictionary_enabled: proto.dictionary_enabled_opt.map(
529 |parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(v)| v,
530 ),
531 compression: proto
532 .compression_opt
533 .map(|parquet_column_options::CompressionOpt::Compression(v)| v),
534 statistics_enabled: proto.statistics_enabled_opt.map(
535 |parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(v)| v,
536 ),
537 bloom_filter_fpp: proto
538 .bloom_filter_fpp_opt
539 .map(|parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v)| v),
540 bloom_filter_ndv: proto
541 .bloom_filter_ndv_opt
542 .map(|parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v)| v),
543 max_statistics_size: proto.max_statistics_size_opt.map(
544 |parquet_column_options::MaxStatisticsSizeOpt::MaxStatisticsSize(v)| {
545 v as usize
546 },
547 ),
548 }
549 }
550}
551
552impl From<&TableParquetOptionsProto> for TableParquetOptions {
553 fn from(proto: &TableParquetOptionsProto) -> Self {
554 TableParquetOptions {
555 global: proto
556 .global
557 .as_ref()
558 .map(ParquetOptions::from)
559 .unwrap_or_default(),
560 column_specific_options: proto
561 .column_specific_options
562 .iter()
563 .map(|parquet_column_options| {
564 (
565 parquet_column_options.column_name.clone(),
566 ParquetColumnOptions::from(
567 parquet_column_options.options.clone().unwrap_or_default(),
568 ),
569 )
570 })
571 .collect(),
572 key_value_metadata: proto
573 .key_value_metadata
574 .iter()
575 .map(|(k, v)| (k.clone(), Some(v.clone())))
576 .collect(),
577 }
578 }
579}
580
581#[derive(Debug)]
582pub struct ParquetLogicalExtensionCodec;
583
584impl LogicalExtensionCodec for ParquetLogicalExtensionCodec {
586 fn try_decode(
587 &self,
588 _buf: &[u8],
589 _inputs: &[datafusion_expr::LogicalPlan],
590 _ctx: &SessionContext,
591 ) -> datafusion_common::Result<datafusion_expr::Extension> {
592 not_impl_err!("Method not implemented")
593 }
594
595 fn try_encode(
596 &self,
597 _node: &datafusion_expr::Extension,
598 _buf: &mut Vec<u8>,
599 ) -> datafusion_common::Result<()> {
600 not_impl_err!("Method not implemented")
601 }
602
603 fn try_decode_table_provider(
604 &self,
605 _buf: &[u8],
606 _table_ref: &TableReference,
607 _schema: arrow::datatypes::SchemaRef,
608 _ctx: &SessionContext,
609 ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
610 not_impl_err!("Method not implemented")
611 }
612
613 fn try_encode_table_provider(
614 &self,
615 _table_ref: &TableReference,
616 _node: Arc<dyn datafusion::datasource::TableProvider>,
617 _buf: &mut Vec<u8>,
618 ) -> datafusion_common::Result<()> {
619 not_impl_err!("Method not implemented")
620 }
621
622 fn try_decode_file_format(
623 &self,
624 buf: &[u8],
625 _ctx: &SessionContext,
626 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
627 let proto = TableParquetOptionsProto::decode(buf).map_err(|e| {
628 DataFusionError::Execution(format!(
629 "Failed to decode TableParquetOptionsProto: {:?}",
630 e
631 ))
632 })?;
633 let options: TableParquetOptions = (&proto).into();
634 Ok(Arc::new(ParquetFormatFactory {
635 options: Some(options),
636 }))
637 }
638
639 fn try_encode_file_format(
640 &self,
641 buf: &mut Vec<u8>,
642 node: Arc<dyn FileFormatFactory>,
643 ) -> datafusion_common::Result<()> {
644 let options = if let Some(parquet_factory) =
645 node.as_any().downcast_ref::<ParquetFormatFactory>()
646 {
647 parquet_factory.options.clone().unwrap_or_default()
648 } else {
649 return Err(DataFusionError::Execution(
650 "Unsupported FileFormatFactory type".to_string(),
651 ));
652 };
653
654 let proto = TableParquetOptionsProto::from_factory(&ParquetFormatFactory {
655 options: Some(options),
656 });
657
658 proto.encode(buf).map_err(|e| {
659 DataFusionError::Execution(format!(
660 "Failed to encode TableParquetOptionsProto: {:?}",
661 e
662 ))
663 })?;
664
665 Ok(())
666 }
667}
668
669#[derive(Debug)]
670pub struct ArrowLogicalExtensionCodec;
671
672impl LogicalExtensionCodec for ArrowLogicalExtensionCodec {
674 fn try_decode(
675 &self,
676 _buf: &[u8],
677 _inputs: &[datafusion_expr::LogicalPlan],
678 _ctx: &SessionContext,
679 ) -> datafusion_common::Result<datafusion_expr::Extension> {
680 not_impl_err!("Method not implemented")
681 }
682
683 fn try_encode(
684 &self,
685 _node: &datafusion_expr::Extension,
686 _buf: &mut Vec<u8>,
687 ) -> datafusion_common::Result<()> {
688 not_impl_err!("Method not implemented")
689 }
690
691 fn try_decode_table_provider(
692 &self,
693 _buf: &[u8],
694 _table_ref: &TableReference,
695 _schema: arrow::datatypes::SchemaRef,
696 _ctx: &SessionContext,
697 ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
698 not_impl_err!("Method not implemented")
699 }
700
701 fn try_encode_table_provider(
702 &self,
703 _table_ref: &TableReference,
704 _node: Arc<dyn datafusion::datasource::TableProvider>,
705 _buf: &mut Vec<u8>,
706 ) -> datafusion_common::Result<()> {
707 not_impl_err!("Method not implemented")
708 }
709
710 fn try_decode_file_format(
711 &self,
712 __buf: &[u8],
713 __ctx: &SessionContext,
714 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
715 Ok(Arc::new(ArrowFormatFactory::new()))
716 }
717
718 fn try_encode_file_format(
719 &self,
720 __buf: &mut Vec<u8>,
721 __node: Arc<dyn FileFormatFactory>,
722 ) -> datafusion_common::Result<()> {
723 Ok(())
724 }
725}
726
727#[derive(Debug)]
728pub struct AvroLogicalExtensionCodec;
729
730impl LogicalExtensionCodec for AvroLogicalExtensionCodec {
732 fn try_decode(
733 &self,
734 _buf: &[u8],
735 _inputs: &[datafusion_expr::LogicalPlan],
736 _ctx: &SessionContext,
737 ) -> datafusion_common::Result<datafusion_expr::Extension> {
738 not_impl_err!("Method not implemented")
739 }
740
741 fn try_encode(
742 &self,
743 _node: &datafusion_expr::Extension,
744 _buf: &mut Vec<u8>,
745 ) -> datafusion_common::Result<()> {
746 not_impl_err!("Method not implemented")
747 }
748
749 fn try_decode_table_provider(
750 &self,
751 _buf: &[u8],
752 _table_ref: &TableReference,
753 _schema: arrow::datatypes::SchemaRef,
754 _cts: &SessionContext,
755 ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
756 not_impl_err!("Method not implemented")
757 }
758
759 fn try_encode_table_provider(
760 &self,
761 _table_ref: &TableReference,
762 _node: Arc<dyn datafusion::datasource::TableProvider>,
763 _buf: &mut Vec<u8>,
764 ) -> datafusion_common::Result<()> {
765 not_impl_err!("Method not implemented")
766 }
767
768 fn try_decode_file_format(
769 &self,
770 __buf: &[u8],
771 __ctx: &SessionContext,
772 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
773 Ok(Arc::new(ArrowFormatFactory::new()))
774 }
775
776 fn try_encode_file_format(
777 &self,
778 __buf: &mut Vec<u8>,
779 __node: Arc<dyn FileFormatFactory>,
780 ) -> datafusion_common::Result<()> {
781 Ok(())
782 }
783}