1use std::sync::Arc;
19
20use crate::protobuf::{CsvOptions as CsvOptionsProto, JsonOptions as JsonOptionsProto};
21use datafusion_common::config::{CsvOptions, JsonOptions};
22use datafusion_common::{
23 exec_datafusion_err, exec_err, not_impl_err, parsers::CompressionTypeVariant,
24 TableReference,
25};
26use datafusion_datasource::file_format::FileFormatFactory;
27use datafusion_datasource_arrow::file_format::ArrowFormatFactory;
28use datafusion_datasource_csv::file_format::CsvFormatFactory;
29use datafusion_datasource_json::file_format::JsonFormatFactory;
30use datafusion_execution::TaskContext;
31use prost::Message;
32
33use super::LogicalExtensionCodec;
34
35#[derive(Debug)]
36pub struct CsvLogicalExtensionCodec;
37
38impl CsvOptionsProto {
39 fn from_factory(factory: &CsvFormatFactory) -> Self {
40 if let Some(options) = &factory.options {
41 CsvOptionsProto {
42 has_header: options.has_header.map_or(vec![], |v| vec![v as u8]),
43 delimiter: vec![options.delimiter],
44 quote: vec![options.quote],
45 terminator: options.terminator.map_or(vec![], |v| vec![v]),
46 escape: options.escape.map_or(vec![], |v| vec![v]),
47 double_quote: options.double_quote.map_or(vec![], |v| vec![v as u8]),
48 compression: options.compression as i32,
49 schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
50 date_format: options.date_format.clone().unwrap_or_default(),
51 datetime_format: options.datetime_format.clone().unwrap_or_default(),
52 timestamp_format: options.timestamp_format.clone().unwrap_or_default(),
53 timestamp_tz_format: options
54 .timestamp_tz_format
55 .clone()
56 .unwrap_or_default(),
57 time_format: options.time_format.clone().unwrap_or_default(),
58 null_value: options.null_value.clone().unwrap_or_default(),
59 null_regex: options.null_regex.clone().unwrap_or_default(),
60 comment: options.comment.map_or(vec![], |v| vec![v]),
61 newlines_in_values: options
62 .newlines_in_values
63 .map_or(vec![], |v| vec![v as u8]),
64 truncated_rows: options.truncated_rows.map_or(vec![], |v| vec![v as u8]),
65 }
66 } else {
67 CsvOptionsProto::default()
68 }
69 }
70}
71
72impl From<&CsvOptionsProto> for CsvOptions {
73 fn from(proto: &CsvOptionsProto) -> Self {
74 CsvOptions {
75 has_header: if !proto.has_header.is_empty() {
76 Some(proto.has_header[0] != 0)
77 } else {
78 None
79 },
80 delimiter: proto.delimiter.first().copied().unwrap_or(b','),
81 quote: proto.quote.first().copied().unwrap_or(b'"'),
82 terminator: if !proto.terminator.is_empty() {
83 Some(proto.terminator[0])
84 } else {
85 None
86 },
87 escape: if !proto.escape.is_empty() {
88 Some(proto.escape[0])
89 } else {
90 None
91 },
92 double_quote: if !proto.double_quote.is_empty() {
93 Some(proto.double_quote[0] != 0)
94 } else {
95 None
96 },
97 compression: match proto.compression {
98 0 => CompressionTypeVariant::GZIP,
99 1 => CompressionTypeVariant::BZIP2,
100 2 => CompressionTypeVariant::XZ,
101 3 => CompressionTypeVariant::ZSTD,
102 _ => CompressionTypeVariant::UNCOMPRESSED,
103 },
104 schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
105 date_format: if proto.date_format.is_empty() {
106 None
107 } else {
108 Some(proto.date_format.clone())
109 },
110 datetime_format: if proto.datetime_format.is_empty() {
111 None
112 } else {
113 Some(proto.datetime_format.clone())
114 },
115 timestamp_format: if proto.timestamp_format.is_empty() {
116 None
117 } else {
118 Some(proto.timestamp_format.clone())
119 },
120 timestamp_tz_format: if proto.timestamp_tz_format.is_empty() {
121 None
122 } else {
123 Some(proto.timestamp_tz_format.clone())
124 },
125 time_format: if proto.time_format.is_empty() {
126 None
127 } else {
128 Some(proto.time_format.clone())
129 },
130 null_value: if proto.null_value.is_empty() {
131 None
132 } else {
133 Some(proto.null_value.clone())
134 },
135 null_regex: if proto.null_regex.is_empty() {
136 None
137 } else {
138 Some(proto.null_regex.clone())
139 },
140 comment: if !proto.comment.is_empty() {
141 Some(proto.comment[0])
142 } else {
143 None
144 },
145 newlines_in_values: if proto.newlines_in_values.is_empty() {
146 None
147 } else {
148 Some(proto.newlines_in_values[0] != 0)
149 },
150 truncated_rows: if proto.truncated_rows.is_empty() {
151 None
152 } else {
153 Some(proto.truncated_rows[0] != 0)
154 },
155 }
156 }
157}
158
159impl LogicalExtensionCodec for CsvLogicalExtensionCodec {
161 fn try_decode(
162 &self,
163 _buf: &[u8],
164 _inputs: &[datafusion_expr::LogicalPlan],
165 _ctx: &TaskContext,
166 ) -> datafusion_common::Result<datafusion_expr::Extension> {
167 not_impl_err!("Method not implemented")
168 }
169
170 fn try_encode(
171 &self,
172 _node: &datafusion_expr::Extension,
173 _buf: &mut Vec<u8>,
174 ) -> datafusion_common::Result<()> {
175 not_impl_err!("Method not implemented")
176 }
177
178 fn try_decode_table_provider(
179 &self,
180 _buf: &[u8],
181 _table_ref: &TableReference,
182 _schema: arrow::datatypes::SchemaRef,
183 _ctx: &TaskContext,
184 ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>> {
185 not_impl_err!("Method not implemented")
186 }
187
188 fn try_encode_table_provider(
189 &self,
190 _table_ref: &TableReference,
191 _node: Arc<dyn datafusion_catalog::TableProvider>,
192 _buf: &mut Vec<u8>,
193 ) -> datafusion_common::Result<()> {
194 not_impl_err!("Method not implemented")
195 }
196
197 fn try_decode_file_format(
198 &self,
199 buf: &[u8],
200 _ctx: &TaskContext,
201 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
202 let proto = CsvOptionsProto::decode(buf).map_err(|e| {
203 exec_datafusion_err!("Failed to decode CsvOptionsProto: {e:?}")
204 })?;
205 let options: CsvOptions = (&proto).into();
206 Ok(Arc::new(CsvFormatFactory {
207 options: Some(options),
208 }))
209 }
210
211 fn try_encode_file_format(
212 &self,
213 buf: &mut Vec<u8>,
214 node: Arc<dyn FileFormatFactory>,
215 ) -> datafusion_common::Result<()> {
216 let options =
217 if let Some(csv_factory) = node.as_any().downcast_ref::<CsvFormatFactory>() {
218 csv_factory.options.clone().unwrap_or_default()
219 } else {
220 return exec_err!("{}", "Unsupported FileFormatFactory type".to_string());
221 };
222
223 let proto = CsvOptionsProto::from_factory(&CsvFormatFactory {
224 options: Some(options),
225 });
226
227 proto
228 .encode(buf)
229 .map_err(|e| exec_datafusion_err!("Failed to encode CsvOptions: {e:?}"))?;
230
231 Ok(())
232 }
233}
234
235impl JsonOptionsProto {
236 fn from_factory(factory: &JsonFormatFactory) -> Self {
237 if let Some(options) = &factory.options {
238 JsonOptionsProto {
239 compression: options.compression as i32,
240 schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
241 }
242 } else {
243 JsonOptionsProto::default()
244 }
245 }
246}
247
248impl From<&JsonOptionsProto> for JsonOptions {
249 fn from(proto: &JsonOptionsProto) -> Self {
250 JsonOptions {
251 compression: match proto.compression {
252 0 => CompressionTypeVariant::GZIP,
253 1 => CompressionTypeVariant::BZIP2,
254 2 => CompressionTypeVariant::XZ,
255 3 => CompressionTypeVariant::ZSTD,
256 _ => CompressionTypeVariant::UNCOMPRESSED,
257 },
258 schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
259 }
260 }
261}
262
263#[derive(Debug)]
264pub struct JsonLogicalExtensionCodec;
265
266impl LogicalExtensionCodec for JsonLogicalExtensionCodec {
268 fn try_decode(
269 &self,
270 _buf: &[u8],
271 _inputs: &[datafusion_expr::LogicalPlan],
272 _ctx: &TaskContext,
273 ) -> datafusion_common::Result<datafusion_expr::Extension> {
274 not_impl_err!("Method not implemented")
275 }
276
277 fn try_encode(
278 &self,
279 _node: &datafusion_expr::Extension,
280 _buf: &mut Vec<u8>,
281 ) -> datafusion_common::Result<()> {
282 not_impl_err!("Method not implemented")
283 }
284
285 fn try_decode_table_provider(
286 &self,
287 _buf: &[u8],
288 _table_ref: &TableReference,
289 _schema: arrow::datatypes::SchemaRef,
290 _ctx: &TaskContext,
291 ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>> {
292 not_impl_err!("Method not implemented")
293 }
294
295 fn try_encode_table_provider(
296 &self,
297 _table_ref: &TableReference,
298 _node: Arc<dyn datafusion_catalog::TableProvider>,
299 _buf: &mut Vec<u8>,
300 ) -> datafusion_common::Result<()> {
301 not_impl_err!("Method not implemented")
302 }
303
304 fn try_decode_file_format(
305 &self,
306 buf: &[u8],
307 _ctx: &TaskContext,
308 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
309 let proto = JsonOptionsProto::decode(buf).map_err(|e| {
310 exec_datafusion_err!("Failed to decode JsonOptionsProto: {e:?}")
311 })?;
312 let options: JsonOptions = (&proto).into();
313 Ok(Arc::new(JsonFormatFactory {
314 options: Some(options),
315 }))
316 }
317
318 fn try_encode_file_format(
319 &self,
320 buf: &mut Vec<u8>,
321 node: Arc<dyn FileFormatFactory>,
322 ) -> datafusion_common::Result<()> {
323 let options = if let Some(json_factory) =
324 node.as_any().downcast_ref::<JsonFormatFactory>()
325 {
326 json_factory.options.clone().unwrap_or_default()
327 } else {
328 return exec_err!("Unsupported FileFormatFactory type");
329 };
330
331 let proto = JsonOptionsProto::from_factory(&JsonFormatFactory {
332 options: Some(options),
333 });
334
335 proto
336 .encode(buf)
337 .map_err(|e| exec_datafusion_err!("Failed to encode JsonOptions: {e:?}"))?;
338
339 Ok(())
340 }
341}
342
343#[cfg(feature = "parquet")]
344mod parquet {
345 use super::*;
346
347 use crate::protobuf::{
348 parquet_column_options, parquet_options,
349 ParquetColumnOptions as ParquetColumnOptionsProto, ParquetColumnSpecificOptions,
350 ParquetOptions as ParquetOptionsProto,
351 TableParquetOptions as TableParquetOptionsProto,
352 };
353 use datafusion_common::config::{
354 ParquetColumnOptions, ParquetOptions, TableParquetOptions,
355 };
356 use datafusion_datasource_parquet::file_format::ParquetFormatFactory;
357
358 impl TableParquetOptionsProto {
359 fn from_factory(factory: &ParquetFormatFactory) -> Self {
360 let global_options = if let Some(ref options) = factory.options {
361 options.clone()
362 } else {
363 return TableParquetOptionsProto::default();
364 };
365
366 let column_specific_options = global_options.column_specific_options;
367 #[allow(deprecated)] TableParquetOptionsProto {
369 global: Some(ParquetOptionsProto {
370 enable_page_index: global_options.global.enable_page_index,
371 pruning: global_options.global.pruning,
372 skip_metadata: global_options.global.skip_metadata,
373 metadata_size_hint_opt: global_options.global.metadata_size_hint.map(|size| {
374 parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size as u64)
375 }),
376 pushdown_filters: global_options.global.pushdown_filters,
377 reorder_filters: global_options.global.reorder_filters,
378 data_pagesize_limit: global_options.global.data_pagesize_limit as u64,
379 write_batch_size: global_options.global.write_batch_size as u64,
380 writer_version: global_options.global.writer_version.clone(),
381 compression_opt: global_options.global.compression.map(|compression| {
382 parquet_options::CompressionOpt::Compression(compression)
383 }),
384 dictionary_enabled_opt: global_options.global.dictionary_enabled.map(|enabled| {
385 parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
386 }),
387 dictionary_page_size_limit: global_options.global.dictionary_page_size_limit as u64,
388 statistics_enabled_opt: global_options.global.statistics_enabled.map(|enabled| {
389 parquet_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
390 }),
391 max_row_group_size: global_options.global.max_row_group_size as u64,
392 created_by: global_options.global.created_by.clone(),
393 column_index_truncate_length_opt: global_options.global.column_index_truncate_length.map(|length| {
394 parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length as u64)
395 }),
396 statistics_truncate_length_opt: global_options.global.statistics_truncate_length.map(|length| {
397 parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length as u64)
398 }),
399 data_page_row_count_limit: global_options.global.data_page_row_count_limit as u64,
400 encoding_opt: global_options.global.encoding.map(|encoding| {
401 parquet_options::EncodingOpt::Encoding(encoding)
402 }),
403 bloom_filter_on_read: global_options.global.bloom_filter_on_read,
404 bloom_filter_on_write: global_options.global.bloom_filter_on_write,
405 bloom_filter_fpp_opt: global_options.global.bloom_filter_fpp.map(|fpp| {
406 parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
407 }),
408 bloom_filter_ndv_opt: global_options.global.bloom_filter_ndv.map(|ndv| {
409 parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
410 }),
411 allow_single_file_parallelism: global_options.global.allow_single_file_parallelism,
412 maximum_parallel_row_group_writers: global_options.global.maximum_parallel_row_group_writers as u64,
413 maximum_buffered_record_batches_per_stream: global_options.global.maximum_buffered_record_batches_per_stream as u64,
414 schema_force_view_types: global_options.global.schema_force_view_types,
415 binary_as_string: global_options.global.binary_as_string,
416 skip_arrow_metadata: global_options.global.skip_arrow_metadata,
417 coerce_int96_opt: global_options.global.coerce_int96.map(|compression| {
418 parquet_options::CoerceInt96Opt::CoerceInt96(compression)
419 }),
420 max_predicate_cache_size_opt: global_options.global.max_predicate_cache_size.map(|size| {
421 parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size as u64)
422 }),
423 }),
424 column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| {
425 ParquetColumnSpecificOptions {
426 column_name,
427 options: Some(ParquetColumnOptionsProto {
428 bloom_filter_enabled_opt: options.bloom_filter_enabled.map(|enabled| {
429 parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(enabled)
430 }),
431 encoding_opt: options.encoding.map(|encoding| {
432 parquet_column_options::EncodingOpt::Encoding(encoding)
433 }),
434 dictionary_enabled_opt: options.dictionary_enabled.map(|enabled| {
435 parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
436 }),
437 compression_opt: options.compression.map(|compression| {
438 parquet_column_options::CompressionOpt::Compression(compression)
439 }),
440 statistics_enabled_opt: options.statistics_enabled.map(|enabled| {
441 parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
442 }),
443 bloom_filter_fpp_opt: options.bloom_filter_fpp.map(|fpp| {
444 parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
445 }),
446 bloom_filter_ndv_opt: options.bloom_filter_ndv.map(|ndv| {
447 parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
448 }),
449 })
450 }
451 }).collect(),
452 key_value_metadata: global_options.key_value_metadata
453 .iter()
454 .filter_map(|(key, value)| {
455 value.as_ref().map(|v| (key.clone(), v.clone()))
456 })
457 .collect(),
458 }
459 }
460 }
461
462 impl From<&ParquetOptionsProto> for ParquetOptions {
463 fn from(proto: &ParquetOptionsProto) -> Self {
464 #[allow(deprecated)] ParquetOptions {
466 enable_page_index: proto.enable_page_index,
467 pruning: proto.pruning,
468 skip_metadata: proto.skip_metadata,
469 metadata_size_hint: proto.metadata_size_hint_opt.as_ref().map(|opt| match opt {
470 parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size) => *size as usize,
471 }),
472 pushdown_filters: proto.pushdown_filters,
473 reorder_filters: proto.reorder_filters,
474 data_pagesize_limit: proto.data_pagesize_limit as usize,
475 write_batch_size: proto.write_batch_size as usize,
476 writer_version: proto.writer_version.clone(),
477 compression: proto.compression_opt.as_ref().map(|opt| match opt {
478 parquet_options::CompressionOpt::Compression(compression) => compression.clone(),
479 }),
480 dictionary_enabled: proto.dictionary_enabled_opt.as_ref().map(|opt| match opt {
481 parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled) => *enabled,
482 }),
483 dictionary_page_size_limit: proto.dictionary_page_size_limit as usize,
484 statistics_enabled: proto.statistics_enabled_opt.as_ref().map(|opt| match opt {
485 parquet_options::StatisticsEnabledOpt::StatisticsEnabled(statistics) => statistics.clone(),
486 }),
487 max_row_group_size: proto.max_row_group_size as usize,
488 created_by: proto.created_by.clone(),
489 column_index_truncate_length: proto.column_index_truncate_length_opt.as_ref().map(|opt| match opt {
490 parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length) => *length as usize,
491 }),
492 statistics_truncate_length: proto.statistics_truncate_length_opt.as_ref().map(|opt| match opt {
493 parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length) => *length as usize,
494 }),
495 data_page_row_count_limit: proto.data_page_row_count_limit as usize,
496 encoding: proto.encoding_opt.as_ref().map(|opt| match opt {
497 parquet_options::EncodingOpt::Encoding(encoding) => encoding.clone(),
498 }),
499 bloom_filter_on_read: proto.bloom_filter_on_read,
500 bloom_filter_on_write: proto.bloom_filter_on_write,
501 bloom_filter_fpp: proto.bloom_filter_fpp_opt.as_ref().map(|opt| match opt {
502 parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp) => *fpp,
503 }),
504 bloom_filter_ndv: proto.bloom_filter_ndv_opt.as_ref().map(|opt| match opt {
505 parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv) => *ndv,
506 }),
507 allow_single_file_parallelism: proto.allow_single_file_parallelism,
508 maximum_parallel_row_group_writers: proto.maximum_parallel_row_group_writers as usize,
509 maximum_buffered_record_batches_per_stream: proto.maximum_buffered_record_batches_per_stream as usize,
510 schema_force_view_types: proto.schema_force_view_types,
511 binary_as_string: proto.binary_as_string,
512 skip_arrow_metadata: proto.skip_arrow_metadata,
513 coerce_int96: proto.coerce_int96_opt.as_ref().map(|opt| match opt {
514 parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => coerce_int96.clone(),
515 }),
516 max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt {
517 parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize,
518 }),
519 }
520 }
521 }
522
523 impl From<ParquetColumnOptionsProto> for ParquetColumnOptions {
524 fn from(proto: ParquetColumnOptionsProto) -> Self {
525 #[allow(deprecated)] ParquetColumnOptions {
527 bloom_filter_enabled: proto.bloom_filter_enabled_opt.map(
528 |parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v)| v,
529 ),
530 encoding: proto
531 .encoding_opt
532 .map(|parquet_column_options::EncodingOpt::Encoding(v)| v),
533 dictionary_enabled: proto.dictionary_enabled_opt.map(
534 |parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(v)| v,
535 ),
536 compression: proto
537 .compression_opt
538 .map(|parquet_column_options::CompressionOpt::Compression(v)| v),
539 statistics_enabled: proto.statistics_enabled_opt.map(
540 |parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(v)| v,
541 ),
542 bloom_filter_fpp: proto
543 .bloom_filter_fpp_opt
544 .map(|parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v)| v),
545 bloom_filter_ndv: proto
546 .bloom_filter_ndv_opt
547 .map(|parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v)| v),
548 }
549 }
550 }
551
552 impl From<&TableParquetOptionsProto> for TableParquetOptions {
553 fn from(proto: &TableParquetOptionsProto) -> Self {
554 TableParquetOptions {
555 global: proto
556 .global
557 .as_ref()
558 .map(ParquetOptions::from)
559 .unwrap_or_default(),
560 column_specific_options: proto
561 .column_specific_options
562 .iter()
563 .map(|parquet_column_options| {
564 (
565 parquet_column_options.column_name.clone(),
566 ParquetColumnOptions::from(
567 parquet_column_options
568 .options
569 .clone()
570 .unwrap_or_default(),
571 ),
572 )
573 })
574 .collect(),
575 key_value_metadata: proto
576 .key_value_metadata
577 .iter()
578 .map(|(k, v)| (k.clone(), Some(v.clone())))
579 .collect(),
580 crypto: Default::default(),
581 }
582 }
583 }
584
585 #[derive(Debug)]
586 pub struct ParquetLogicalExtensionCodec;
587
588 impl LogicalExtensionCodec for ParquetLogicalExtensionCodec {
590 fn try_decode(
591 &self,
592 _buf: &[u8],
593 _inputs: &[datafusion_expr::LogicalPlan],
594 _ctx: &TaskContext,
595 ) -> datafusion_common::Result<datafusion_expr::Extension> {
596 not_impl_err!("Method not implemented")
597 }
598
599 fn try_encode(
600 &self,
601 _node: &datafusion_expr::Extension,
602 _buf: &mut Vec<u8>,
603 ) -> datafusion_common::Result<()> {
604 not_impl_err!("Method not implemented")
605 }
606
607 fn try_decode_table_provider(
608 &self,
609 _buf: &[u8],
610 _table_ref: &TableReference,
611 _schema: arrow::datatypes::SchemaRef,
612 _ctx: &TaskContext,
613 ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>>
614 {
615 not_impl_err!("Method not implemented")
616 }
617
618 fn try_encode_table_provider(
619 &self,
620 _table_ref: &TableReference,
621 _node: Arc<dyn datafusion_catalog::TableProvider>,
622 _buf: &mut Vec<u8>,
623 ) -> datafusion_common::Result<()> {
624 not_impl_err!("Method not implemented")
625 }
626
627 fn try_decode_file_format(
628 &self,
629 buf: &[u8],
630 _ctx: &TaskContext,
631 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
632 let proto = TableParquetOptionsProto::decode(buf).map_err(|e| {
633 exec_datafusion_err!("Failed to decode TableParquetOptionsProto: {e:?}")
634 })?;
635 let options: TableParquetOptions = (&proto).into();
636 Ok(Arc::new(
637 datafusion_datasource_parquet::file_format::ParquetFormatFactory {
638 options: Some(options),
639 },
640 ))
641 }
642
643 fn try_encode_file_format(
644 &self,
645 buf: &mut Vec<u8>,
646 node: Arc<dyn FileFormatFactory>,
647 ) -> datafusion_common::Result<()> {
648 use datafusion_datasource_parquet::file_format::ParquetFormatFactory;
649
650 let options = if let Some(parquet_factory) =
651 node.as_any().downcast_ref::<ParquetFormatFactory>()
652 {
653 parquet_factory.options.clone().unwrap_or_default()
654 } else {
655 return exec_err!("Unsupported FileFormatFactory type");
656 };
657
658 let proto = TableParquetOptionsProto::from_factory(&ParquetFormatFactory {
659 options: Some(options),
660 });
661
662 proto.encode(buf).map_err(|e| {
663 exec_datafusion_err!("Failed to encode TableParquetOptionsProto: {e:?}")
664 })?;
665
666 Ok(())
667 }
668 }
669}
670#[cfg(feature = "parquet")]
671pub use parquet::ParquetLogicalExtensionCodec;
672
673#[derive(Debug)]
674pub struct ArrowLogicalExtensionCodec;
675
676impl LogicalExtensionCodec for ArrowLogicalExtensionCodec {
678 fn try_decode(
679 &self,
680 _buf: &[u8],
681 _inputs: &[datafusion_expr::LogicalPlan],
682 _ctx: &TaskContext,
683 ) -> datafusion_common::Result<datafusion_expr::Extension> {
684 not_impl_err!("Method not implemented")
685 }
686
687 fn try_encode(
688 &self,
689 _node: &datafusion_expr::Extension,
690 _buf: &mut Vec<u8>,
691 ) -> datafusion_common::Result<()> {
692 not_impl_err!("Method not implemented")
693 }
694
695 fn try_decode_table_provider(
696 &self,
697 _buf: &[u8],
698 _table_ref: &TableReference,
699 _schema: arrow::datatypes::SchemaRef,
700 _ctx: &TaskContext,
701 ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>> {
702 not_impl_err!("Method not implemented")
703 }
704
705 fn try_encode_table_provider(
706 &self,
707 _table_ref: &TableReference,
708 _node: Arc<dyn datafusion_catalog::TableProvider>,
709 _buf: &mut Vec<u8>,
710 ) -> datafusion_common::Result<()> {
711 not_impl_err!("Method not implemented")
712 }
713
714 fn try_decode_file_format(
715 &self,
716 __buf: &[u8],
717 __ctx: &TaskContext,
718 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
719 Ok(Arc::new(ArrowFormatFactory::new()))
720 }
721
722 fn try_encode_file_format(
723 &self,
724 __buf: &mut Vec<u8>,
725 __node: Arc<dyn FileFormatFactory>,
726 ) -> datafusion_common::Result<()> {
727 Ok(())
728 }
729}
730
731#[derive(Debug)]
732pub struct AvroLogicalExtensionCodec;
733
734impl LogicalExtensionCodec for AvroLogicalExtensionCodec {
736 fn try_decode(
737 &self,
738 _buf: &[u8],
739 _inputs: &[datafusion_expr::LogicalPlan],
740 _ctx: &TaskContext,
741 ) -> datafusion_common::Result<datafusion_expr::Extension> {
742 not_impl_err!("Method not implemented")
743 }
744
745 fn try_encode(
746 &self,
747 _node: &datafusion_expr::Extension,
748 _buf: &mut Vec<u8>,
749 ) -> datafusion_common::Result<()> {
750 not_impl_err!("Method not implemented")
751 }
752
753 fn try_decode_table_provider(
754 &self,
755 _buf: &[u8],
756 _table_ref: &TableReference,
757 _schema: arrow::datatypes::SchemaRef,
758 _cts: &TaskContext,
759 ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>> {
760 not_impl_err!("Method not implemented")
761 }
762
763 fn try_encode_table_provider(
764 &self,
765 _table_ref: &TableReference,
766 _node: Arc<dyn datafusion_catalog::TableProvider>,
767 _buf: &mut Vec<u8>,
768 ) -> datafusion_common::Result<()> {
769 not_impl_err!("Method not implemented")
770 }
771
772 fn try_decode_file_format(
773 &self,
774 __buf: &[u8],
775 __ctx: &TaskContext,
776 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
777 Ok(Arc::new(ArrowFormatFactory::new()))
778 }
779
780 fn try_encode_file_format(
781 &self,
782 __buf: &mut Vec<u8>,
783 __node: Arc<dyn FileFormatFactory>,
784 ) -> datafusion_common::Result<()> {
785 Ok(())
786 }
787}