1use std::sync::Arc;
19
20use crate::protobuf::{CsvOptions as CsvOptionsProto, JsonOptions as JsonOptionsProto};
21use datafusion_common::config::{CsvOptions, JsonOptions};
22use datafusion_common::{
23 TableReference, exec_datafusion_err, exec_err, not_impl_err,
24 parsers::CompressionTypeVariant,
25};
26use datafusion_datasource::file_format::FileFormatFactory;
27use datafusion_datasource_arrow::file_format::ArrowFormatFactory;
28use datafusion_datasource_csv::file_format::CsvFormatFactory;
29use datafusion_datasource_json::file_format::JsonFormatFactory;
30use datafusion_execution::TaskContext;
31use prost::Message;
32
33use super::LogicalExtensionCodec;
34
35#[derive(Debug)]
36pub struct CsvLogicalExtensionCodec;
37
38impl CsvOptionsProto {
39 fn from_factory(factory: &CsvFormatFactory) -> Self {
40 if let Some(options) = &factory.options {
41 CsvOptionsProto {
42 has_header: options.has_header.map_or(vec![], |v| vec![v as u8]),
43 delimiter: vec![options.delimiter],
44 quote: vec![options.quote],
45 terminator: options.terminator.map_or(vec![], |v| vec![v]),
46 escape: options.escape.map_or(vec![], |v| vec![v]),
47 double_quote: options.double_quote.map_or(vec![], |v| vec![v as u8]),
48 compression: options.compression as i32,
49 schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
50 date_format: options.date_format.clone().unwrap_or_default(),
51 datetime_format: options.datetime_format.clone().unwrap_or_default(),
52 timestamp_format: options.timestamp_format.clone().unwrap_or_default(),
53 timestamp_tz_format: options
54 .timestamp_tz_format
55 .clone()
56 .unwrap_or_default(),
57 time_format: options.time_format.clone().unwrap_or_default(),
58 null_value: options.null_value.clone().unwrap_or_default(),
59 null_regex: options.null_regex.clone().unwrap_or_default(),
60 comment: options.comment.map_or(vec![], |v| vec![v]),
61 newlines_in_values: options
62 .newlines_in_values
63 .map_or(vec![], |v| vec![v as u8]),
64 truncated_rows: options.truncated_rows.map_or(vec![], |v| vec![v as u8]),
65 compression_level: options.compression_level,
66 }
67 } else {
68 CsvOptionsProto::default()
69 }
70 }
71}
72
73impl From<&CsvOptionsProto> for CsvOptions {
74 fn from(proto: &CsvOptionsProto) -> Self {
75 CsvOptions {
76 has_header: if !proto.has_header.is_empty() {
77 Some(proto.has_header[0] != 0)
78 } else {
79 None
80 },
81 delimiter: proto.delimiter.first().copied().unwrap_or(b','),
82 quote: proto.quote.first().copied().unwrap_or(b'"'),
83 terminator: if !proto.terminator.is_empty() {
84 Some(proto.terminator[0])
85 } else {
86 None
87 },
88 escape: if !proto.escape.is_empty() {
89 Some(proto.escape[0])
90 } else {
91 None
92 },
93 double_quote: if !proto.double_quote.is_empty() {
94 Some(proto.double_quote[0] != 0)
95 } else {
96 None
97 },
98 compression: match proto.compression {
99 0 => CompressionTypeVariant::GZIP,
100 1 => CompressionTypeVariant::BZIP2,
101 2 => CompressionTypeVariant::XZ,
102 3 => CompressionTypeVariant::ZSTD,
103 _ => CompressionTypeVariant::UNCOMPRESSED,
104 },
105 schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
106 date_format: if proto.date_format.is_empty() {
107 None
108 } else {
109 Some(proto.date_format.clone())
110 },
111 datetime_format: if proto.datetime_format.is_empty() {
112 None
113 } else {
114 Some(proto.datetime_format.clone())
115 },
116 timestamp_format: if proto.timestamp_format.is_empty() {
117 None
118 } else {
119 Some(proto.timestamp_format.clone())
120 },
121 timestamp_tz_format: if proto.timestamp_tz_format.is_empty() {
122 None
123 } else {
124 Some(proto.timestamp_tz_format.clone())
125 },
126 time_format: if proto.time_format.is_empty() {
127 None
128 } else {
129 Some(proto.time_format.clone())
130 },
131 null_value: if proto.null_value.is_empty() {
132 None
133 } else {
134 Some(proto.null_value.clone())
135 },
136 null_regex: if proto.null_regex.is_empty() {
137 None
138 } else {
139 Some(proto.null_regex.clone())
140 },
141 comment: if !proto.comment.is_empty() {
142 Some(proto.comment[0])
143 } else {
144 None
145 },
146 newlines_in_values: if proto.newlines_in_values.is_empty() {
147 None
148 } else {
149 Some(proto.newlines_in_values[0] != 0)
150 },
151 truncated_rows: if proto.truncated_rows.is_empty() {
152 None
153 } else {
154 Some(proto.truncated_rows[0] != 0)
155 },
156 compression_level: proto.compression_level,
157 }
158 }
159}
160
161impl LogicalExtensionCodec for CsvLogicalExtensionCodec {
163 fn try_decode(
164 &self,
165 _buf: &[u8],
166 _inputs: &[datafusion_expr::LogicalPlan],
167 _ctx: &TaskContext,
168 ) -> datafusion_common::Result<datafusion_expr::Extension> {
169 not_impl_err!("Method not implemented")
170 }
171
172 fn try_encode(
173 &self,
174 _node: &datafusion_expr::Extension,
175 _buf: &mut Vec<u8>,
176 ) -> datafusion_common::Result<()> {
177 not_impl_err!("Method not implemented")
178 }
179
180 fn try_decode_table_provider(
181 &self,
182 _buf: &[u8],
183 _table_ref: &TableReference,
184 _schema: arrow::datatypes::SchemaRef,
185 _ctx: &TaskContext,
186 ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>> {
187 not_impl_err!("Method not implemented")
188 }
189
190 fn try_encode_table_provider(
191 &self,
192 _table_ref: &TableReference,
193 _node: Arc<dyn datafusion_catalog::TableProvider>,
194 _buf: &mut Vec<u8>,
195 ) -> datafusion_common::Result<()> {
196 not_impl_err!("Method not implemented")
197 }
198
199 fn try_decode_file_format(
200 &self,
201 buf: &[u8],
202 _ctx: &TaskContext,
203 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
204 let proto = CsvOptionsProto::decode(buf).map_err(|e| {
205 exec_datafusion_err!("Failed to decode CsvOptionsProto: {e:?}")
206 })?;
207 let options: CsvOptions = (&proto).into();
208 Ok(Arc::new(CsvFormatFactory {
209 options: Some(options),
210 }))
211 }
212
213 fn try_encode_file_format(
214 &self,
215 buf: &mut Vec<u8>,
216 node: Arc<dyn FileFormatFactory>,
217 ) -> datafusion_common::Result<()> {
218 let options =
219 if let Some(csv_factory) = node.as_any().downcast_ref::<CsvFormatFactory>() {
220 csv_factory.options.clone().unwrap_or_default()
221 } else {
222 return exec_err!("{}", "Unsupported FileFormatFactory type".to_string());
223 };
224
225 let proto = CsvOptionsProto::from_factory(&CsvFormatFactory {
226 options: Some(options),
227 });
228
229 proto
230 .encode(buf)
231 .map_err(|e| exec_datafusion_err!("Failed to encode CsvOptions: {e:?}"))?;
232
233 Ok(())
234 }
235}
236
237impl JsonOptionsProto {
238 fn from_factory(factory: &JsonFormatFactory) -> Self {
239 if let Some(options) = &factory.options {
240 JsonOptionsProto {
241 compression: options.compression as i32,
242 schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
243 compression_level: options.compression_level,
244 }
245 } else {
246 JsonOptionsProto::default()
247 }
248 }
249}
250
251impl From<&JsonOptionsProto> for JsonOptions {
252 fn from(proto: &JsonOptionsProto) -> Self {
253 JsonOptions {
254 compression: match proto.compression {
255 0 => CompressionTypeVariant::GZIP,
256 1 => CompressionTypeVariant::BZIP2,
257 2 => CompressionTypeVariant::XZ,
258 3 => CompressionTypeVariant::ZSTD,
259 _ => CompressionTypeVariant::UNCOMPRESSED,
260 },
261 schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
262 compression_level: proto.compression_level,
263 }
264 }
265}
266
267#[derive(Debug)]
268pub struct JsonLogicalExtensionCodec;
269
270impl LogicalExtensionCodec for JsonLogicalExtensionCodec {
272 fn try_decode(
273 &self,
274 _buf: &[u8],
275 _inputs: &[datafusion_expr::LogicalPlan],
276 _ctx: &TaskContext,
277 ) -> datafusion_common::Result<datafusion_expr::Extension> {
278 not_impl_err!("Method not implemented")
279 }
280
281 fn try_encode(
282 &self,
283 _node: &datafusion_expr::Extension,
284 _buf: &mut Vec<u8>,
285 ) -> datafusion_common::Result<()> {
286 not_impl_err!("Method not implemented")
287 }
288
289 fn try_decode_table_provider(
290 &self,
291 _buf: &[u8],
292 _table_ref: &TableReference,
293 _schema: arrow::datatypes::SchemaRef,
294 _ctx: &TaskContext,
295 ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>> {
296 not_impl_err!("Method not implemented")
297 }
298
299 fn try_encode_table_provider(
300 &self,
301 _table_ref: &TableReference,
302 _node: Arc<dyn datafusion_catalog::TableProvider>,
303 _buf: &mut Vec<u8>,
304 ) -> datafusion_common::Result<()> {
305 not_impl_err!("Method not implemented")
306 }
307
308 fn try_decode_file_format(
309 &self,
310 buf: &[u8],
311 _ctx: &TaskContext,
312 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
313 let proto = JsonOptionsProto::decode(buf).map_err(|e| {
314 exec_datafusion_err!("Failed to decode JsonOptionsProto: {e:?}")
315 })?;
316 let options: JsonOptions = (&proto).into();
317 Ok(Arc::new(JsonFormatFactory {
318 options: Some(options),
319 }))
320 }
321
322 fn try_encode_file_format(
323 &self,
324 buf: &mut Vec<u8>,
325 node: Arc<dyn FileFormatFactory>,
326 ) -> datafusion_common::Result<()> {
327 let options = if let Some(json_factory) =
328 node.as_any().downcast_ref::<JsonFormatFactory>()
329 {
330 json_factory.options.clone().unwrap_or_default()
331 } else {
332 return exec_err!("Unsupported FileFormatFactory type");
333 };
334
335 let proto = JsonOptionsProto::from_factory(&JsonFormatFactory {
336 options: Some(options),
337 });
338
339 proto
340 .encode(buf)
341 .map_err(|e| exec_datafusion_err!("Failed to encode JsonOptions: {e:?}"))?;
342
343 Ok(())
344 }
345}
346
347#[cfg(feature = "parquet")]
348mod parquet {
349 use super::*;
350
351 use crate::protobuf::{
352 ParquetColumnOptions as ParquetColumnOptionsProto, ParquetColumnSpecificOptions,
353 ParquetOptions as ParquetOptionsProto,
354 TableParquetOptions as TableParquetOptionsProto, parquet_column_options,
355 parquet_options,
356 };
357 use datafusion_common::config::{
358 ParquetColumnOptions, ParquetOptions, TableParquetOptions,
359 };
360 use datafusion_datasource_parquet::file_format::ParquetFormatFactory;
361
362 impl TableParquetOptionsProto {
363 fn from_factory(factory: &ParquetFormatFactory) -> Self {
364 let global_options = if let Some(ref options) = factory.options {
365 options.clone()
366 } else {
367 return TableParquetOptionsProto::default();
368 };
369
370 let column_specific_options = global_options.column_specific_options;
371 TableParquetOptionsProto {
372 global: Some(ParquetOptionsProto {
373 enable_page_index: global_options.global.enable_page_index,
374 pruning: global_options.global.pruning,
375 skip_metadata: global_options.global.skip_metadata,
376 metadata_size_hint_opt: global_options.global.metadata_size_hint.map(|size| {
377 parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size as u64)
378 }),
379 pushdown_filters: global_options.global.pushdown_filters,
380 reorder_filters: global_options.global.reorder_filters,
381 force_filter_selections: global_options.global.force_filter_selections,
382 data_pagesize_limit: global_options.global.data_pagesize_limit as u64,
383 write_batch_size: global_options.global.write_batch_size as u64,
384 writer_version: global_options.global.writer_version.to_string(),
385 compression_opt: global_options.global.compression.map(|compression| {
386 parquet_options::CompressionOpt::Compression(compression)
387 }),
388 dictionary_enabled_opt: global_options.global.dictionary_enabled.map(|enabled| {
389 parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
390 }),
391 dictionary_page_size_limit: global_options.global.dictionary_page_size_limit as u64,
392 statistics_enabled_opt: global_options.global.statistics_enabled.map(|enabled| {
393 parquet_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
394 }),
395 max_row_group_size: global_options.global.max_row_group_size as u64,
396 created_by: global_options.global.created_by.clone(),
397 column_index_truncate_length_opt: global_options.global.column_index_truncate_length.map(|length| {
398 parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length as u64)
399 }),
400 statistics_truncate_length_opt: global_options.global.statistics_truncate_length.map(|length| {
401 parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length as u64)
402 }),
403 data_page_row_count_limit: global_options.global.data_page_row_count_limit as u64,
404 encoding_opt: global_options.global.encoding.map(|encoding| {
405 parquet_options::EncodingOpt::Encoding(encoding)
406 }),
407 bloom_filter_on_read: global_options.global.bloom_filter_on_read,
408 bloom_filter_on_write: global_options.global.bloom_filter_on_write,
409 bloom_filter_fpp_opt: global_options.global.bloom_filter_fpp.map(|fpp| {
410 parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
411 }),
412 bloom_filter_ndv_opt: global_options.global.bloom_filter_ndv.map(|ndv| {
413 parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
414 }),
415 allow_single_file_parallelism: global_options.global.allow_single_file_parallelism,
416 maximum_parallel_row_group_writers: global_options.global.maximum_parallel_row_group_writers as u64,
417 maximum_buffered_record_batches_per_stream: global_options.global.maximum_buffered_record_batches_per_stream as u64,
418 schema_force_view_types: global_options.global.schema_force_view_types,
419 binary_as_string: global_options.global.binary_as_string,
420 skip_arrow_metadata: global_options.global.skip_arrow_metadata,
421 coerce_int96_opt: global_options.global.coerce_int96.map(|compression| {
422 parquet_options::CoerceInt96Opt::CoerceInt96(compression)
423 }),
424 max_predicate_cache_size_opt: global_options.global.max_predicate_cache_size.map(|size| {
425 parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size as u64)
426 }),
427 }),
428 column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| {
429 ParquetColumnSpecificOptions {
430 column_name,
431 options: Some(ParquetColumnOptionsProto {
432 bloom_filter_enabled_opt: options.bloom_filter_enabled.map(|enabled| {
433 parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(enabled)
434 }),
435 encoding_opt: options.encoding.map(|encoding| {
436 parquet_column_options::EncodingOpt::Encoding(encoding)
437 }),
438 dictionary_enabled_opt: options.dictionary_enabled.map(|enabled| {
439 parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
440 }),
441 compression_opt: options.compression.map(|compression| {
442 parquet_column_options::CompressionOpt::Compression(compression)
443 }),
444 statistics_enabled_opt: options.statistics_enabled.map(|enabled| {
445 parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
446 }),
447 bloom_filter_fpp_opt: options.bloom_filter_fpp.map(|fpp| {
448 parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
449 }),
450 bloom_filter_ndv_opt: options.bloom_filter_ndv.map(|ndv| {
451 parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
452 }),
453 })
454 }
455 }).collect(),
456 key_value_metadata: global_options.key_value_metadata
457 .iter()
458 .filter_map(|(key, value)| {
459 value.as_ref().map(|v| (key.clone(), v.clone()))
460 })
461 .collect(),
462 }
463 }
464 }
465
466 impl From<&ParquetOptionsProto> for ParquetOptions {
467 fn from(proto: &ParquetOptionsProto) -> Self {
468 ParquetOptions {
469 enable_page_index: proto.enable_page_index,
470 pruning: proto.pruning,
471 skip_metadata: proto.skip_metadata,
472 metadata_size_hint: proto.metadata_size_hint_opt.as_ref().map(|opt| match opt {
473 parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size) => *size as usize,
474 }),
475 pushdown_filters: proto.pushdown_filters,
476 reorder_filters: proto.reorder_filters,
477 force_filter_selections: proto.force_filter_selections,
478 data_pagesize_limit: proto.data_pagesize_limit as usize,
479 write_batch_size: proto.write_batch_size as usize,
480 writer_version: proto.writer_version.parse().expect("
482 Invalid parquet writer version in proto, expected '1.0' or '2.0'
483 "),
484 compression: proto.compression_opt.as_ref().map(|opt| match opt {
485 parquet_options::CompressionOpt::Compression(compression) => compression.clone(),
486 }),
487 dictionary_enabled: proto.dictionary_enabled_opt.as_ref().map(|opt| match opt {
488 parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled) => *enabled,
489 }),
490 dictionary_page_size_limit: proto.dictionary_page_size_limit as usize,
491 statistics_enabled: proto.statistics_enabled_opt.as_ref().map(|opt| match opt {
492 parquet_options::StatisticsEnabledOpt::StatisticsEnabled(statistics) => statistics.clone(),
493 }),
494 max_row_group_size: proto.max_row_group_size as usize,
495 created_by: proto.created_by.clone(),
496 column_index_truncate_length: proto.column_index_truncate_length_opt.as_ref().map(|opt| match opt {
497 parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length) => *length as usize,
498 }),
499 statistics_truncate_length: proto.statistics_truncate_length_opt.as_ref().map(|opt| match opt {
500 parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length) => *length as usize,
501 }),
502 data_page_row_count_limit: proto.data_page_row_count_limit as usize,
503 encoding: proto.encoding_opt.as_ref().map(|opt| match opt {
504 parquet_options::EncodingOpt::Encoding(encoding) => encoding.clone(),
505 }),
506 bloom_filter_on_read: proto.bloom_filter_on_read,
507 bloom_filter_on_write: proto.bloom_filter_on_write,
508 bloom_filter_fpp: proto.bloom_filter_fpp_opt.as_ref().map(|opt| match opt {
509 parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp) => *fpp,
510 }),
511 bloom_filter_ndv: proto.bloom_filter_ndv_opt.as_ref().map(|opt| match opt {
512 parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv) => *ndv,
513 }),
514 allow_single_file_parallelism: proto.allow_single_file_parallelism,
515 maximum_parallel_row_group_writers: proto.maximum_parallel_row_group_writers as usize,
516 maximum_buffered_record_batches_per_stream: proto.maximum_buffered_record_batches_per_stream as usize,
517 schema_force_view_types: proto.schema_force_view_types,
518 binary_as_string: proto.binary_as_string,
519 skip_arrow_metadata: proto.skip_arrow_metadata,
520 coerce_int96: proto.coerce_int96_opt.as_ref().map(|opt| match opt {
521 parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => coerce_int96.clone(),
522 }),
523 max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt {
524 parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize,
525 }),
526 }
527 }
528 }
529
530 impl From<ParquetColumnOptionsProto> for ParquetColumnOptions {
531 fn from(proto: ParquetColumnOptionsProto) -> Self {
532 ParquetColumnOptions {
533 bloom_filter_enabled: proto.bloom_filter_enabled_opt.map(
534 |parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v)| v,
535 ),
536 encoding: proto
537 .encoding_opt
538 .map(|parquet_column_options::EncodingOpt::Encoding(v)| v),
539 dictionary_enabled: proto.dictionary_enabled_opt.map(
540 |parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(v)| v,
541 ),
542 compression: proto
543 .compression_opt
544 .map(|parquet_column_options::CompressionOpt::Compression(v)| v),
545 statistics_enabled: proto.statistics_enabled_opt.map(
546 |parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(v)| v,
547 ),
548 bloom_filter_fpp: proto
549 .bloom_filter_fpp_opt
550 .map(|parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v)| v),
551 bloom_filter_ndv: proto
552 .bloom_filter_ndv_opt
553 .map(|parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v)| v),
554 }
555 }
556 }
557
558 impl From<&TableParquetOptionsProto> for TableParquetOptions {
559 fn from(proto: &TableParquetOptionsProto) -> Self {
560 TableParquetOptions {
561 global: proto
562 .global
563 .as_ref()
564 .map(ParquetOptions::from)
565 .unwrap_or_default(),
566 column_specific_options: proto
567 .column_specific_options
568 .iter()
569 .map(|parquet_column_options| {
570 (
571 parquet_column_options.column_name.clone(),
572 ParquetColumnOptions::from(
573 parquet_column_options
574 .options
575 .clone()
576 .unwrap_or_default(),
577 ),
578 )
579 })
580 .collect(),
581 key_value_metadata: proto
582 .key_value_metadata
583 .iter()
584 .map(|(k, v)| (k.clone(), Some(v.clone())))
585 .collect(),
586 crypto: Default::default(),
587 }
588 }
589 }
590
591 #[derive(Debug)]
592 pub struct ParquetLogicalExtensionCodec;
593
594 impl LogicalExtensionCodec for ParquetLogicalExtensionCodec {
596 fn try_decode(
597 &self,
598 _buf: &[u8],
599 _inputs: &[datafusion_expr::LogicalPlan],
600 _ctx: &TaskContext,
601 ) -> datafusion_common::Result<datafusion_expr::Extension> {
602 not_impl_err!("Method not implemented")
603 }
604
605 fn try_encode(
606 &self,
607 _node: &datafusion_expr::Extension,
608 _buf: &mut Vec<u8>,
609 ) -> datafusion_common::Result<()> {
610 not_impl_err!("Method not implemented")
611 }
612
613 fn try_decode_table_provider(
614 &self,
615 _buf: &[u8],
616 _table_ref: &TableReference,
617 _schema: arrow::datatypes::SchemaRef,
618 _ctx: &TaskContext,
619 ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>>
620 {
621 not_impl_err!("Method not implemented")
622 }
623
624 fn try_encode_table_provider(
625 &self,
626 _table_ref: &TableReference,
627 _node: Arc<dyn datafusion_catalog::TableProvider>,
628 _buf: &mut Vec<u8>,
629 ) -> datafusion_common::Result<()> {
630 not_impl_err!("Method not implemented")
631 }
632
633 fn try_decode_file_format(
634 &self,
635 buf: &[u8],
636 _ctx: &TaskContext,
637 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
638 let proto = TableParquetOptionsProto::decode(buf).map_err(|e| {
639 exec_datafusion_err!("Failed to decode TableParquetOptionsProto: {e:?}")
640 })?;
641 let options: TableParquetOptions = (&proto).into();
642 Ok(Arc::new(
643 datafusion_datasource_parquet::file_format::ParquetFormatFactory {
644 options: Some(options),
645 },
646 ))
647 }
648
649 fn try_encode_file_format(
650 &self,
651 buf: &mut Vec<u8>,
652 node: Arc<dyn FileFormatFactory>,
653 ) -> datafusion_common::Result<()> {
654 use datafusion_datasource_parquet::file_format::ParquetFormatFactory;
655
656 let options = if let Some(parquet_factory) =
657 node.as_any().downcast_ref::<ParquetFormatFactory>()
658 {
659 parquet_factory.options.clone().unwrap_or_default()
660 } else {
661 return exec_err!("Unsupported FileFormatFactory type");
662 };
663
664 let proto = TableParquetOptionsProto::from_factory(&ParquetFormatFactory {
665 options: Some(options),
666 });
667
668 proto.encode(buf).map_err(|e| {
669 exec_datafusion_err!("Failed to encode TableParquetOptionsProto: {e:?}")
670 })?;
671
672 Ok(())
673 }
674 }
675}
676#[cfg(feature = "parquet")]
677pub use parquet::ParquetLogicalExtensionCodec;
678
679#[derive(Debug)]
680pub struct ArrowLogicalExtensionCodec;
681
682impl LogicalExtensionCodec for ArrowLogicalExtensionCodec {
684 fn try_decode(
685 &self,
686 _buf: &[u8],
687 _inputs: &[datafusion_expr::LogicalPlan],
688 _ctx: &TaskContext,
689 ) -> datafusion_common::Result<datafusion_expr::Extension> {
690 not_impl_err!("Method not implemented")
691 }
692
693 fn try_encode(
694 &self,
695 _node: &datafusion_expr::Extension,
696 _buf: &mut Vec<u8>,
697 ) -> datafusion_common::Result<()> {
698 not_impl_err!("Method not implemented")
699 }
700
701 fn try_decode_table_provider(
702 &self,
703 _buf: &[u8],
704 _table_ref: &TableReference,
705 _schema: arrow::datatypes::SchemaRef,
706 _ctx: &TaskContext,
707 ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>> {
708 not_impl_err!("Method not implemented")
709 }
710
711 fn try_encode_table_provider(
712 &self,
713 _table_ref: &TableReference,
714 _node: Arc<dyn datafusion_catalog::TableProvider>,
715 _buf: &mut Vec<u8>,
716 ) -> datafusion_common::Result<()> {
717 not_impl_err!("Method not implemented")
718 }
719
720 fn try_decode_file_format(
721 &self,
722 __buf: &[u8],
723 __ctx: &TaskContext,
724 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
725 Ok(Arc::new(ArrowFormatFactory::new()))
726 }
727
728 fn try_encode_file_format(
729 &self,
730 __buf: &mut Vec<u8>,
731 __node: Arc<dyn FileFormatFactory>,
732 ) -> datafusion_common::Result<()> {
733 Ok(())
734 }
735}
736
737#[derive(Debug)]
738pub struct AvroLogicalExtensionCodec;
739
740impl LogicalExtensionCodec for AvroLogicalExtensionCodec {
742 fn try_decode(
743 &self,
744 _buf: &[u8],
745 _inputs: &[datafusion_expr::LogicalPlan],
746 _ctx: &TaskContext,
747 ) -> datafusion_common::Result<datafusion_expr::Extension> {
748 not_impl_err!("Method not implemented")
749 }
750
751 fn try_encode(
752 &self,
753 _node: &datafusion_expr::Extension,
754 _buf: &mut Vec<u8>,
755 ) -> datafusion_common::Result<()> {
756 not_impl_err!("Method not implemented")
757 }
758
759 fn try_decode_table_provider(
760 &self,
761 _buf: &[u8],
762 _table_ref: &TableReference,
763 _schema: arrow::datatypes::SchemaRef,
764 _cts: &TaskContext,
765 ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>> {
766 not_impl_err!("Method not implemented")
767 }
768
769 fn try_encode_table_provider(
770 &self,
771 _table_ref: &TableReference,
772 _node: Arc<dyn datafusion_catalog::TableProvider>,
773 _buf: &mut Vec<u8>,
774 ) -> datafusion_common::Result<()> {
775 not_impl_err!("Method not implemented")
776 }
777
778 fn try_decode_file_format(
779 &self,
780 __buf: &[u8],
781 __ctx: &TaskContext,
782 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
783 Ok(Arc::new(ArrowFormatFactory::new()))
784 }
785
786 fn try_encode_file_format(
787 &self,
788 __buf: &mut Vec<u8>,
789 __node: Arc<dyn FileFormatFactory>,
790 ) -> datafusion_common::Result<()> {
791 Ok(())
792 }
793}