1use std::sync::Arc;
19
20use crate::protobuf::{CsvOptions as CsvOptionsProto, JsonOptions as JsonOptionsProto};
21use datafusion_common::config::{CsvOptions, JsonOptions};
22use datafusion_common::{
23 TableReference, exec_datafusion_err, exec_err, not_impl_err,
24 parsers::CompressionTypeVariant,
25};
26use datafusion_datasource::file_format::FileFormatFactory;
27use datafusion_datasource_arrow::file_format::ArrowFormatFactory;
28use datafusion_datasource_csv::file_format::CsvFormatFactory;
29use datafusion_datasource_json::file_format::JsonFormatFactory;
30use datafusion_execution::TaskContext;
31use prost::Message;
32
33use super::LogicalExtensionCodec;
34
35#[derive(Debug)]
36pub struct CsvLogicalExtensionCodec;
37
38impl CsvOptionsProto {
39 fn from_factory(factory: &CsvFormatFactory) -> Self {
40 if let Some(options) = &factory.options {
41 CsvOptionsProto {
42 has_header: options.has_header.map_or(vec![], |v| vec![v as u8]),
43 delimiter: vec![options.delimiter],
44 quote: vec![options.quote],
45 terminator: options.terminator.map_or(vec![], |v| vec![v]),
46 escape: options.escape.map_or(vec![], |v| vec![v]),
47 double_quote: options.double_quote.map_or(vec![], |v| vec![v as u8]),
48 compression: options.compression as i32,
49 schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
50 date_format: options.date_format.clone().unwrap_or_default(),
51 datetime_format: options.datetime_format.clone().unwrap_or_default(),
52 timestamp_format: options.timestamp_format.clone().unwrap_or_default(),
53 timestamp_tz_format: options
54 .timestamp_tz_format
55 .clone()
56 .unwrap_or_default(),
57 time_format: options.time_format.clone().unwrap_or_default(),
58 null_value: options.null_value.clone().unwrap_or_default(),
59 null_regex: options.null_regex.clone().unwrap_or_default(),
60 comment: options.comment.map_or(vec![], |v| vec![v]),
61 newlines_in_values: options
62 .newlines_in_values
63 .map_or(vec![], |v| vec![v as u8]),
64 truncated_rows: options.truncated_rows.map_or(vec![], |v| vec![v as u8]),
65 compression_level: options.compression_level,
66 }
67 } else {
68 CsvOptionsProto::default()
69 }
70 }
71}
72
73impl From<&CsvOptionsProto> for CsvOptions {
74 fn from(proto: &CsvOptionsProto) -> Self {
75 CsvOptions {
76 has_header: if !proto.has_header.is_empty() {
77 Some(proto.has_header[0] != 0)
78 } else {
79 None
80 },
81 delimiter: proto.delimiter.first().copied().unwrap_or(b','),
82 quote: proto.quote.first().copied().unwrap_or(b'"'),
83 terminator: if !proto.terminator.is_empty() {
84 Some(proto.terminator[0])
85 } else {
86 None
87 },
88 escape: if !proto.escape.is_empty() {
89 Some(proto.escape[0])
90 } else {
91 None
92 },
93 double_quote: if !proto.double_quote.is_empty() {
94 Some(proto.double_quote[0] != 0)
95 } else {
96 None
97 },
98 compression: match proto.compression {
99 0 => CompressionTypeVariant::GZIP,
100 1 => CompressionTypeVariant::BZIP2,
101 2 => CompressionTypeVariant::XZ,
102 3 => CompressionTypeVariant::ZSTD,
103 _ => CompressionTypeVariant::UNCOMPRESSED,
104 },
105 schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
106 date_format: if proto.date_format.is_empty() {
107 None
108 } else {
109 Some(proto.date_format.clone())
110 },
111 datetime_format: if proto.datetime_format.is_empty() {
112 None
113 } else {
114 Some(proto.datetime_format.clone())
115 },
116 timestamp_format: if proto.timestamp_format.is_empty() {
117 None
118 } else {
119 Some(proto.timestamp_format.clone())
120 },
121 timestamp_tz_format: if proto.timestamp_tz_format.is_empty() {
122 None
123 } else {
124 Some(proto.timestamp_tz_format.clone())
125 },
126 time_format: if proto.time_format.is_empty() {
127 None
128 } else {
129 Some(proto.time_format.clone())
130 },
131 null_value: if proto.null_value.is_empty() {
132 None
133 } else {
134 Some(proto.null_value.clone())
135 },
136 null_regex: if proto.null_regex.is_empty() {
137 None
138 } else {
139 Some(proto.null_regex.clone())
140 },
141 comment: if !proto.comment.is_empty() {
142 Some(proto.comment[0])
143 } else {
144 None
145 },
146 newlines_in_values: if proto.newlines_in_values.is_empty() {
147 None
148 } else {
149 Some(proto.newlines_in_values[0] != 0)
150 },
151 truncated_rows: if proto.truncated_rows.is_empty() {
152 None
153 } else {
154 Some(proto.truncated_rows[0] != 0)
155 },
156 compression_level: proto.compression_level,
157 }
158 }
159}
160
161impl LogicalExtensionCodec for CsvLogicalExtensionCodec {
163 fn try_decode(
164 &self,
165 _buf: &[u8],
166 _inputs: &[datafusion_expr::LogicalPlan],
167 _ctx: &TaskContext,
168 ) -> datafusion_common::Result<datafusion_expr::Extension> {
169 not_impl_err!("Method not implemented")
170 }
171
172 fn try_encode(
173 &self,
174 _node: &datafusion_expr::Extension,
175 _buf: &mut Vec<u8>,
176 ) -> datafusion_common::Result<()> {
177 not_impl_err!("Method not implemented")
178 }
179
180 fn try_decode_table_provider(
181 &self,
182 _buf: &[u8],
183 _table_ref: &TableReference,
184 _schema: arrow::datatypes::SchemaRef,
185 _ctx: &TaskContext,
186 ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>> {
187 not_impl_err!("Method not implemented")
188 }
189
190 fn try_encode_table_provider(
191 &self,
192 _table_ref: &TableReference,
193 _node: Arc<dyn datafusion_catalog::TableProvider>,
194 _buf: &mut Vec<u8>,
195 ) -> datafusion_common::Result<()> {
196 not_impl_err!("Method not implemented")
197 }
198
199 fn try_decode_file_format(
200 &self,
201 buf: &[u8],
202 _ctx: &TaskContext,
203 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
204 let proto = CsvOptionsProto::decode(buf).map_err(|e| {
205 exec_datafusion_err!("Failed to decode CsvOptionsProto: {e:?}")
206 })?;
207 let options: CsvOptions = (&proto).into();
208 Ok(Arc::new(CsvFormatFactory {
209 options: Some(options),
210 }))
211 }
212
213 fn try_encode_file_format(
214 &self,
215 buf: &mut Vec<u8>,
216 node: Arc<dyn FileFormatFactory>,
217 ) -> datafusion_common::Result<()> {
218 let options =
219 if let Some(csv_factory) = node.as_any().downcast_ref::<CsvFormatFactory>() {
220 csv_factory.options.clone().unwrap_or_default()
221 } else {
222 return exec_err!("{}", "Unsupported FileFormatFactory type".to_string());
223 };
224
225 let proto = CsvOptionsProto::from_factory(&CsvFormatFactory {
226 options: Some(options),
227 });
228
229 proto
230 .encode(buf)
231 .map_err(|e| exec_datafusion_err!("Failed to encode CsvOptions: {e:?}"))?;
232
233 Ok(())
234 }
235}
236
237impl JsonOptionsProto {
238 fn from_factory(factory: &JsonFormatFactory) -> Self {
239 if let Some(options) = &factory.options {
240 JsonOptionsProto {
241 compression: options.compression as i32,
242 schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
243 compression_level: options.compression_level,
244 newline_delimited: Some(options.newline_delimited),
245 }
246 } else {
247 JsonOptionsProto::default()
248 }
249 }
250}
251
252impl From<&JsonOptionsProto> for JsonOptions {
253 fn from(proto: &JsonOptionsProto) -> Self {
254 JsonOptions {
255 compression: match proto.compression {
256 0 => CompressionTypeVariant::GZIP,
257 1 => CompressionTypeVariant::BZIP2,
258 2 => CompressionTypeVariant::XZ,
259 3 => CompressionTypeVariant::ZSTD,
260 _ => CompressionTypeVariant::UNCOMPRESSED,
261 },
262 schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
263 compression_level: proto.compression_level,
264 newline_delimited: proto.newline_delimited.unwrap_or(true),
265 }
266 }
267}
268
269#[derive(Debug)]
270pub struct JsonLogicalExtensionCodec;
271
272impl LogicalExtensionCodec for JsonLogicalExtensionCodec {
274 fn try_decode(
275 &self,
276 _buf: &[u8],
277 _inputs: &[datafusion_expr::LogicalPlan],
278 _ctx: &TaskContext,
279 ) -> datafusion_common::Result<datafusion_expr::Extension> {
280 not_impl_err!("Method not implemented")
281 }
282
283 fn try_encode(
284 &self,
285 _node: &datafusion_expr::Extension,
286 _buf: &mut Vec<u8>,
287 ) -> datafusion_common::Result<()> {
288 not_impl_err!("Method not implemented")
289 }
290
291 fn try_decode_table_provider(
292 &self,
293 _buf: &[u8],
294 _table_ref: &TableReference,
295 _schema: arrow::datatypes::SchemaRef,
296 _ctx: &TaskContext,
297 ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>> {
298 not_impl_err!("Method not implemented")
299 }
300
301 fn try_encode_table_provider(
302 &self,
303 _table_ref: &TableReference,
304 _node: Arc<dyn datafusion_catalog::TableProvider>,
305 _buf: &mut Vec<u8>,
306 ) -> datafusion_common::Result<()> {
307 not_impl_err!("Method not implemented")
308 }
309
310 fn try_decode_file_format(
311 &self,
312 buf: &[u8],
313 _ctx: &TaskContext,
314 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
315 let proto = JsonOptionsProto::decode(buf).map_err(|e| {
316 exec_datafusion_err!("Failed to decode JsonOptionsProto: {e:?}")
317 })?;
318 let options: JsonOptions = (&proto).into();
319 Ok(Arc::new(JsonFormatFactory {
320 options: Some(options),
321 }))
322 }
323
324 fn try_encode_file_format(
325 &self,
326 buf: &mut Vec<u8>,
327 node: Arc<dyn FileFormatFactory>,
328 ) -> datafusion_common::Result<()> {
329 let options = if let Some(json_factory) =
330 node.as_any().downcast_ref::<JsonFormatFactory>()
331 {
332 json_factory.options.clone().unwrap_or_default()
333 } else {
334 return exec_err!("Unsupported FileFormatFactory type");
335 };
336
337 let proto = JsonOptionsProto::from_factory(&JsonFormatFactory {
338 options: Some(options),
339 });
340
341 proto
342 .encode(buf)
343 .map_err(|e| exec_datafusion_err!("Failed to encode JsonOptions: {e:?}"))?;
344
345 Ok(())
346 }
347}
348
349#[cfg(feature = "parquet")]
350mod parquet {
351 use super::*;
352
353 use crate::protobuf::{
354 ParquetColumnOptions as ParquetColumnOptionsProto, ParquetColumnSpecificOptions,
355 ParquetOptions as ParquetOptionsProto,
356 TableParquetOptions as TableParquetOptionsProto, parquet_column_options,
357 parquet_options,
358 };
359 use datafusion_common::config::{
360 ParquetColumnOptions, ParquetOptions, TableParquetOptions,
361 };
362 use datafusion_datasource_parquet::file_format::ParquetFormatFactory;
363
364 impl TableParquetOptionsProto {
365 fn from_factory(factory: &ParquetFormatFactory) -> Self {
366 let global_options = if let Some(ref options) = factory.options {
367 options.clone()
368 } else {
369 return TableParquetOptionsProto::default();
370 };
371
372 let column_specific_options = global_options.column_specific_options;
373 TableParquetOptionsProto {
374 global: Some(ParquetOptionsProto {
375 enable_page_index: global_options.global.enable_page_index,
376 pruning: global_options.global.pruning,
377 skip_metadata: global_options.global.skip_metadata,
378 metadata_size_hint_opt: global_options.global.metadata_size_hint.map(|size| {
379 parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size as u64)
380 }),
381 pushdown_filters: global_options.global.pushdown_filters,
382 reorder_filters: global_options.global.reorder_filters,
383 force_filter_selections: global_options.global.force_filter_selections,
384 data_pagesize_limit: global_options.global.data_pagesize_limit as u64,
385 write_batch_size: global_options.global.write_batch_size as u64,
386 writer_version: global_options.global.writer_version.to_string(),
387 compression_opt: global_options.global.compression.map(|compression| {
388 parquet_options::CompressionOpt::Compression(compression)
389 }),
390 dictionary_enabled_opt: global_options.global.dictionary_enabled.map(|enabled| {
391 parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
392 }),
393 dictionary_page_size_limit: global_options.global.dictionary_page_size_limit as u64,
394 statistics_enabled_opt: global_options.global.statistics_enabled.map(|enabled| {
395 parquet_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
396 }),
397 max_row_group_size: global_options.global.max_row_group_size as u64,
398 created_by: global_options.global.created_by.clone(),
399 column_index_truncate_length_opt: global_options.global.column_index_truncate_length.map(|length| {
400 parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length as u64)
401 }),
402 statistics_truncate_length_opt: global_options.global.statistics_truncate_length.map(|length| {
403 parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length as u64)
404 }),
405 data_page_row_count_limit: global_options.global.data_page_row_count_limit as u64,
406 encoding_opt: global_options.global.encoding.map(|encoding| {
407 parquet_options::EncodingOpt::Encoding(encoding)
408 }),
409 bloom_filter_on_read: global_options.global.bloom_filter_on_read,
410 bloom_filter_on_write: global_options.global.bloom_filter_on_write,
411 bloom_filter_fpp_opt: global_options.global.bloom_filter_fpp.map(|fpp| {
412 parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
413 }),
414 bloom_filter_ndv_opt: global_options.global.bloom_filter_ndv.map(|ndv| {
415 parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
416 }),
417 allow_single_file_parallelism: global_options.global.allow_single_file_parallelism,
418 maximum_parallel_row_group_writers: global_options.global.maximum_parallel_row_group_writers as u64,
419 maximum_buffered_record_batches_per_stream: global_options.global.maximum_buffered_record_batches_per_stream as u64,
420 schema_force_view_types: global_options.global.schema_force_view_types,
421 binary_as_string: global_options.global.binary_as_string,
422 skip_arrow_metadata: global_options.global.skip_arrow_metadata,
423 coerce_int96_opt: global_options.global.coerce_int96.map(|compression| {
424 parquet_options::CoerceInt96Opt::CoerceInt96(compression)
425 }),
426 max_predicate_cache_size_opt: global_options.global.max_predicate_cache_size.map(|size| {
427 parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size as u64)
428 }),
429 }),
430 column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| {
431 ParquetColumnSpecificOptions {
432 column_name,
433 options: Some(ParquetColumnOptionsProto {
434 bloom_filter_enabled_opt: options.bloom_filter_enabled.map(|enabled| {
435 parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(enabled)
436 }),
437 encoding_opt: options.encoding.map(|encoding| {
438 parquet_column_options::EncodingOpt::Encoding(encoding)
439 }),
440 dictionary_enabled_opt: options.dictionary_enabled.map(|enabled| {
441 parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
442 }),
443 compression_opt: options.compression.map(|compression| {
444 parquet_column_options::CompressionOpt::Compression(compression)
445 }),
446 statistics_enabled_opt: options.statistics_enabled.map(|enabled| {
447 parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
448 }),
449 bloom_filter_fpp_opt: options.bloom_filter_fpp.map(|fpp| {
450 parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
451 }),
452 bloom_filter_ndv_opt: options.bloom_filter_ndv.map(|ndv| {
453 parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
454 }),
455 })
456 }
457 }).collect(),
458 key_value_metadata: global_options.key_value_metadata
459 .iter()
460 .filter_map(|(key, value)| {
461 value.as_ref().map(|v| (key.clone(), v.clone()))
462 })
463 .collect(),
464 }
465 }
466 }
467
468 impl From<&ParquetOptionsProto> for ParquetOptions {
469 fn from(proto: &ParquetOptionsProto) -> Self {
470 ParquetOptions {
471 enable_page_index: proto.enable_page_index,
472 pruning: proto.pruning,
473 skip_metadata: proto.skip_metadata,
474 metadata_size_hint: proto.metadata_size_hint_opt.as_ref().map(|opt| match opt {
475 parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size) => *size as usize,
476 }),
477 pushdown_filters: proto.pushdown_filters,
478 reorder_filters: proto.reorder_filters,
479 force_filter_selections: proto.force_filter_selections,
480 data_pagesize_limit: proto.data_pagesize_limit as usize,
481 write_batch_size: proto.write_batch_size as usize,
482 writer_version: proto.writer_version.parse().expect("
484 Invalid parquet writer version in proto, expected '1.0' or '2.0'
485 "),
486 compression: proto.compression_opt.as_ref().map(|opt| match opt {
487 parquet_options::CompressionOpt::Compression(compression) => compression.clone(),
488 }),
489 dictionary_enabled: proto.dictionary_enabled_opt.as_ref().map(|opt| match opt {
490 parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled) => *enabled,
491 }),
492 dictionary_page_size_limit: proto.dictionary_page_size_limit as usize,
493 statistics_enabled: proto.statistics_enabled_opt.as_ref().map(|opt| match opt {
494 parquet_options::StatisticsEnabledOpt::StatisticsEnabled(statistics) => statistics.clone(),
495 }),
496 max_row_group_size: proto.max_row_group_size as usize,
497 created_by: proto.created_by.clone(),
498 column_index_truncate_length: proto.column_index_truncate_length_opt.as_ref().map(|opt| match opt {
499 parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length) => *length as usize,
500 }),
501 statistics_truncate_length: proto.statistics_truncate_length_opt.as_ref().map(|opt| match opt {
502 parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length) => *length as usize,
503 }),
504 data_page_row_count_limit: proto.data_page_row_count_limit as usize,
505 encoding: proto.encoding_opt.as_ref().map(|opt| match opt {
506 parquet_options::EncodingOpt::Encoding(encoding) => encoding.clone(),
507 }),
508 bloom_filter_on_read: proto.bloom_filter_on_read,
509 bloom_filter_on_write: proto.bloom_filter_on_write,
510 bloom_filter_fpp: proto.bloom_filter_fpp_opt.as_ref().map(|opt| match opt {
511 parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp) => *fpp,
512 }),
513 bloom_filter_ndv: proto.bloom_filter_ndv_opt.as_ref().map(|opt| match opt {
514 parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv) => *ndv,
515 }),
516 allow_single_file_parallelism: proto.allow_single_file_parallelism,
517 maximum_parallel_row_group_writers: proto.maximum_parallel_row_group_writers as usize,
518 maximum_buffered_record_batches_per_stream: proto.maximum_buffered_record_batches_per_stream as usize,
519 schema_force_view_types: proto.schema_force_view_types,
520 binary_as_string: proto.binary_as_string,
521 skip_arrow_metadata: proto.skip_arrow_metadata,
522 coerce_int96: proto.coerce_int96_opt.as_ref().map(|opt| match opt {
523 parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => coerce_int96.clone(),
524 }),
525 max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt {
526 parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize,
527 }),
528 }
529 }
530 }
531
532 impl From<ParquetColumnOptionsProto> for ParquetColumnOptions {
533 fn from(proto: ParquetColumnOptionsProto) -> Self {
534 ParquetColumnOptions {
535 bloom_filter_enabled: proto.bloom_filter_enabled_opt.map(
536 |parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v)| v,
537 ),
538 encoding: proto
539 .encoding_opt
540 .map(|parquet_column_options::EncodingOpt::Encoding(v)| v),
541 dictionary_enabled: proto.dictionary_enabled_opt.map(
542 |parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(v)| v,
543 ),
544 compression: proto
545 .compression_opt
546 .map(|parquet_column_options::CompressionOpt::Compression(v)| v),
547 statistics_enabled: proto.statistics_enabled_opt.map(
548 |parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(v)| v,
549 ),
550 bloom_filter_fpp: proto
551 .bloom_filter_fpp_opt
552 .map(|parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v)| v),
553 bloom_filter_ndv: proto
554 .bloom_filter_ndv_opt
555 .map(|parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v)| v),
556 }
557 }
558 }
559
560 impl From<&TableParquetOptionsProto> for TableParquetOptions {
561 fn from(proto: &TableParquetOptionsProto) -> Self {
562 TableParquetOptions {
563 global: proto
564 .global
565 .as_ref()
566 .map(ParquetOptions::from)
567 .unwrap_or_default(),
568 column_specific_options: proto
569 .column_specific_options
570 .iter()
571 .map(|parquet_column_options| {
572 (
573 parquet_column_options.column_name.clone(),
574 ParquetColumnOptions::from(
575 parquet_column_options
576 .options
577 .clone()
578 .unwrap_or_default(),
579 ),
580 )
581 })
582 .collect(),
583 key_value_metadata: proto
584 .key_value_metadata
585 .iter()
586 .map(|(k, v)| (k.clone(), Some(v.clone())))
587 .collect(),
588 crypto: Default::default(),
589 }
590 }
591 }
592
593 #[derive(Debug)]
594 pub struct ParquetLogicalExtensionCodec;
595
596 impl LogicalExtensionCodec for ParquetLogicalExtensionCodec {
598 fn try_decode(
599 &self,
600 _buf: &[u8],
601 _inputs: &[datafusion_expr::LogicalPlan],
602 _ctx: &TaskContext,
603 ) -> datafusion_common::Result<datafusion_expr::Extension> {
604 not_impl_err!("Method not implemented")
605 }
606
607 fn try_encode(
608 &self,
609 _node: &datafusion_expr::Extension,
610 _buf: &mut Vec<u8>,
611 ) -> datafusion_common::Result<()> {
612 not_impl_err!("Method not implemented")
613 }
614
615 fn try_decode_table_provider(
616 &self,
617 _buf: &[u8],
618 _table_ref: &TableReference,
619 _schema: arrow::datatypes::SchemaRef,
620 _ctx: &TaskContext,
621 ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>>
622 {
623 not_impl_err!("Method not implemented")
624 }
625
626 fn try_encode_table_provider(
627 &self,
628 _table_ref: &TableReference,
629 _node: Arc<dyn datafusion_catalog::TableProvider>,
630 _buf: &mut Vec<u8>,
631 ) -> datafusion_common::Result<()> {
632 not_impl_err!("Method not implemented")
633 }
634
635 fn try_decode_file_format(
636 &self,
637 buf: &[u8],
638 _ctx: &TaskContext,
639 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
640 let proto = TableParquetOptionsProto::decode(buf).map_err(|e| {
641 exec_datafusion_err!("Failed to decode TableParquetOptionsProto: {e:?}")
642 })?;
643 let options: TableParquetOptions = (&proto).into();
644 Ok(Arc::new(
645 datafusion_datasource_parquet::file_format::ParquetFormatFactory {
646 options: Some(options),
647 },
648 ))
649 }
650
651 fn try_encode_file_format(
652 &self,
653 buf: &mut Vec<u8>,
654 node: Arc<dyn FileFormatFactory>,
655 ) -> datafusion_common::Result<()> {
656 use datafusion_datasource_parquet::file_format::ParquetFormatFactory;
657
658 let options = if let Some(parquet_factory) =
659 node.as_any().downcast_ref::<ParquetFormatFactory>()
660 {
661 parquet_factory.options.clone().unwrap_or_default()
662 } else {
663 return exec_err!("Unsupported FileFormatFactory type");
664 };
665
666 let proto = TableParquetOptionsProto::from_factory(&ParquetFormatFactory {
667 options: Some(options),
668 });
669
670 proto.encode(buf).map_err(|e| {
671 exec_datafusion_err!("Failed to encode TableParquetOptionsProto: {e:?}")
672 })?;
673
674 Ok(())
675 }
676 }
677}
678#[cfg(feature = "parquet")]
679pub use parquet::ParquetLogicalExtensionCodec;
680
681#[derive(Debug)]
682pub struct ArrowLogicalExtensionCodec;
683
684impl LogicalExtensionCodec for ArrowLogicalExtensionCodec {
686 fn try_decode(
687 &self,
688 _buf: &[u8],
689 _inputs: &[datafusion_expr::LogicalPlan],
690 _ctx: &TaskContext,
691 ) -> datafusion_common::Result<datafusion_expr::Extension> {
692 not_impl_err!("Method not implemented")
693 }
694
695 fn try_encode(
696 &self,
697 _node: &datafusion_expr::Extension,
698 _buf: &mut Vec<u8>,
699 ) -> datafusion_common::Result<()> {
700 not_impl_err!("Method not implemented")
701 }
702
703 fn try_decode_table_provider(
704 &self,
705 _buf: &[u8],
706 _table_ref: &TableReference,
707 _schema: arrow::datatypes::SchemaRef,
708 _ctx: &TaskContext,
709 ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>> {
710 not_impl_err!("Method not implemented")
711 }
712
713 fn try_encode_table_provider(
714 &self,
715 _table_ref: &TableReference,
716 _node: Arc<dyn datafusion_catalog::TableProvider>,
717 _buf: &mut Vec<u8>,
718 ) -> datafusion_common::Result<()> {
719 not_impl_err!("Method not implemented")
720 }
721
722 fn try_decode_file_format(
723 &self,
724 __buf: &[u8],
725 __ctx: &TaskContext,
726 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
727 Ok(Arc::new(ArrowFormatFactory::new()))
728 }
729
730 fn try_encode_file_format(
731 &self,
732 __buf: &mut Vec<u8>,
733 __node: Arc<dyn FileFormatFactory>,
734 ) -> datafusion_common::Result<()> {
735 Ok(())
736 }
737}
738
739#[derive(Debug)]
740pub struct AvroLogicalExtensionCodec;
741
742impl LogicalExtensionCodec for AvroLogicalExtensionCodec {
744 fn try_decode(
745 &self,
746 _buf: &[u8],
747 _inputs: &[datafusion_expr::LogicalPlan],
748 _ctx: &TaskContext,
749 ) -> datafusion_common::Result<datafusion_expr::Extension> {
750 not_impl_err!("Method not implemented")
751 }
752
753 fn try_encode(
754 &self,
755 _node: &datafusion_expr::Extension,
756 _buf: &mut Vec<u8>,
757 ) -> datafusion_common::Result<()> {
758 not_impl_err!("Method not implemented")
759 }
760
761 fn try_decode_table_provider(
762 &self,
763 _buf: &[u8],
764 _table_ref: &TableReference,
765 _schema: arrow::datatypes::SchemaRef,
766 _cts: &TaskContext,
767 ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>> {
768 not_impl_err!("Method not implemented")
769 }
770
771 fn try_encode_table_provider(
772 &self,
773 _table_ref: &TableReference,
774 _node: Arc<dyn datafusion_catalog::TableProvider>,
775 _buf: &mut Vec<u8>,
776 ) -> datafusion_common::Result<()> {
777 not_impl_err!("Method not implemented")
778 }
779
780 fn try_decode_file_format(
781 &self,
782 __buf: &[u8],
783 __ctx: &TaskContext,
784 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
785 Ok(Arc::new(ArrowFormatFactory::new()))
786 }
787
788 fn try_encode_file_format(
789 &self,
790 __buf: &mut Vec<u8>,
791 __node: Arc<dyn FileFormatFactory>,
792 ) -> datafusion_common::Result<()> {
793 Ok(())
794 }
795}