1use std::sync::Arc;
19
20use datafusion::{
21 config::{
22 CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
23 TableParquetOptions,
24 },
25 datasource::file_format::{
26 arrow::ArrowFormatFactory, csv::CsvFormatFactory, json::JsonFormatFactory,
27 parquet::ParquetFormatFactory, FileFormatFactory,
28 },
29 prelude::SessionContext,
30};
31use datafusion_common::{
32 exec_err, not_impl_err, parsers::CompressionTypeVariant, DataFusionError,
33 TableReference,
34};
35use prost::Message;
36
37use crate::protobuf::{
38 parquet_column_options, parquet_options, CsvOptions as CsvOptionsProto,
39 JsonOptions as JsonOptionsProto, ParquetColumnOptions as ParquetColumnOptionsProto,
40 ParquetColumnSpecificOptions, ParquetOptions as ParquetOptionsProto,
41 TableParquetOptions as TableParquetOptionsProto,
42};
43
44use super::LogicalExtensionCodec;
45
46#[derive(Debug)]
47pub struct CsvLogicalExtensionCodec;
48
49impl CsvOptionsProto {
50 fn from_factory(factory: &CsvFormatFactory) -> Self {
51 if let Some(options) = &factory.options {
52 CsvOptionsProto {
53 has_header: options.has_header.map_or(vec![], |v| vec![v as u8]),
54 delimiter: vec![options.delimiter],
55 quote: vec![options.quote],
56 terminator: options.terminator.map_or(vec![], |v| vec![v]),
57 escape: options.escape.map_or(vec![], |v| vec![v]),
58 double_quote: options.double_quote.map_or(vec![], |v| vec![v as u8]),
59 compression: options.compression as i32,
60 schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
61 date_format: options.date_format.clone().unwrap_or_default(),
62 datetime_format: options.datetime_format.clone().unwrap_or_default(),
63 timestamp_format: options.timestamp_format.clone().unwrap_or_default(),
64 timestamp_tz_format: options
65 .timestamp_tz_format
66 .clone()
67 .unwrap_or_default(),
68 time_format: options.time_format.clone().unwrap_or_default(),
69 null_value: options.null_value.clone().unwrap_or_default(),
70 null_regex: options.null_regex.clone().unwrap_or_default(),
71 comment: options.comment.map_or(vec![], |v| vec![v]),
72 newlines_in_values: options
73 .newlines_in_values
74 .map_or(vec![], |v| vec![v as u8]),
75 }
76 } else {
77 CsvOptionsProto::default()
78 }
79 }
80}
81
82impl From<&CsvOptionsProto> for CsvOptions {
83 fn from(proto: &CsvOptionsProto) -> Self {
84 CsvOptions {
85 has_header: if !proto.has_header.is_empty() {
86 Some(proto.has_header[0] != 0)
87 } else {
88 None
89 },
90 delimiter: proto.delimiter.first().copied().unwrap_or(b','),
91 quote: proto.quote.first().copied().unwrap_or(b'"'),
92 terminator: if !proto.terminator.is_empty() {
93 Some(proto.terminator[0])
94 } else {
95 None
96 },
97 escape: if !proto.escape.is_empty() {
98 Some(proto.escape[0])
99 } else {
100 None
101 },
102 double_quote: if !proto.double_quote.is_empty() {
103 Some(proto.double_quote[0] != 0)
104 } else {
105 None
106 },
107 compression: match proto.compression {
108 0 => CompressionTypeVariant::GZIP,
109 1 => CompressionTypeVariant::BZIP2,
110 2 => CompressionTypeVariant::XZ,
111 3 => CompressionTypeVariant::ZSTD,
112 _ => CompressionTypeVariant::UNCOMPRESSED,
113 },
114 schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
115 date_format: if proto.date_format.is_empty() {
116 None
117 } else {
118 Some(proto.date_format.clone())
119 },
120 datetime_format: if proto.datetime_format.is_empty() {
121 None
122 } else {
123 Some(proto.datetime_format.clone())
124 },
125 timestamp_format: if proto.timestamp_format.is_empty() {
126 None
127 } else {
128 Some(proto.timestamp_format.clone())
129 },
130 timestamp_tz_format: if proto.timestamp_tz_format.is_empty() {
131 None
132 } else {
133 Some(proto.timestamp_tz_format.clone())
134 },
135 time_format: if proto.time_format.is_empty() {
136 None
137 } else {
138 Some(proto.time_format.clone())
139 },
140 null_value: if proto.null_value.is_empty() {
141 None
142 } else {
143 Some(proto.null_value.clone())
144 },
145 null_regex: if proto.null_regex.is_empty() {
146 None
147 } else {
148 Some(proto.null_regex.clone())
149 },
150 comment: if !proto.comment.is_empty() {
151 Some(proto.comment[0])
152 } else {
153 None
154 },
155 newlines_in_values: if proto.newlines_in_values.is_empty() {
156 None
157 } else {
158 Some(proto.newlines_in_values[0] != 0)
159 },
160 }
161 }
162}
163
164impl LogicalExtensionCodec for CsvLogicalExtensionCodec {
166 fn try_decode(
167 &self,
168 _buf: &[u8],
169 _inputs: &[datafusion_expr::LogicalPlan],
170 _ctx: &SessionContext,
171 ) -> datafusion_common::Result<datafusion_expr::Extension> {
172 not_impl_err!("Method not implemented")
173 }
174
175 fn try_encode(
176 &self,
177 _node: &datafusion_expr::Extension,
178 _buf: &mut Vec<u8>,
179 ) -> datafusion_common::Result<()> {
180 not_impl_err!("Method not implemented")
181 }
182
183 fn try_decode_table_provider(
184 &self,
185 _buf: &[u8],
186 _table_ref: &TableReference,
187 _schema: arrow::datatypes::SchemaRef,
188 _ctx: &SessionContext,
189 ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
190 not_impl_err!("Method not implemented")
191 }
192
193 fn try_encode_table_provider(
194 &self,
195 _table_ref: &TableReference,
196 _node: Arc<dyn datafusion::datasource::TableProvider>,
197 _buf: &mut Vec<u8>,
198 ) -> datafusion_common::Result<()> {
199 not_impl_err!("Method not implemented")
200 }
201
202 fn try_decode_file_format(
203 &self,
204 buf: &[u8],
205 _ctx: &SessionContext,
206 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
207 let proto = CsvOptionsProto::decode(buf).map_err(|e| {
208 DataFusionError::Execution(format!("Failed to decode CsvOptionsProto: {e:?}"))
209 })?;
210 let options: CsvOptions = (&proto).into();
211 Ok(Arc::new(CsvFormatFactory {
212 options: Some(options),
213 }))
214 }
215
216 fn try_encode_file_format(
217 &self,
218 buf: &mut Vec<u8>,
219 node: Arc<dyn FileFormatFactory>,
220 ) -> datafusion_common::Result<()> {
221 let options =
222 if let Some(csv_factory) = node.as_any().downcast_ref::<CsvFormatFactory>() {
223 csv_factory.options.clone().unwrap_or_default()
224 } else {
225 return exec_err!("{}", "Unsupported FileFormatFactory type".to_string());
226 };
227
228 let proto = CsvOptionsProto::from_factory(&CsvFormatFactory {
229 options: Some(options),
230 });
231
232 proto.encode(buf).map_err(|e| {
233 DataFusionError::Execution(format!("Failed to encode CsvOptions: {e:?}"))
234 })?;
235
236 Ok(())
237 }
238}
239
240impl JsonOptionsProto {
241 fn from_factory(factory: &JsonFormatFactory) -> Self {
242 if let Some(options) = &factory.options {
243 JsonOptionsProto {
244 compression: options.compression as i32,
245 schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
246 }
247 } else {
248 JsonOptionsProto::default()
249 }
250 }
251}
252
253impl From<&JsonOptionsProto> for JsonOptions {
254 fn from(proto: &JsonOptionsProto) -> Self {
255 JsonOptions {
256 compression: match proto.compression {
257 0 => CompressionTypeVariant::GZIP,
258 1 => CompressionTypeVariant::BZIP2,
259 2 => CompressionTypeVariant::XZ,
260 3 => CompressionTypeVariant::ZSTD,
261 _ => CompressionTypeVariant::UNCOMPRESSED,
262 },
263 schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
264 }
265 }
266}
267
268#[derive(Debug)]
269pub struct JsonLogicalExtensionCodec;
270
271impl LogicalExtensionCodec for JsonLogicalExtensionCodec {
273 fn try_decode(
274 &self,
275 _buf: &[u8],
276 _inputs: &[datafusion_expr::LogicalPlan],
277 _ctx: &SessionContext,
278 ) -> datafusion_common::Result<datafusion_expr::Extension> {
279 not_impl_err!("Method not implemented")
280 }
281
282 fn try_encode(
283 &self,
284 _node: &datafusion_expr::Extension,
285 _buf: &mut Vec<u8>,
286 ) -> datafusion_common::Result<()> {
287 not_impl_err!("Method not implemented")
288 }
289
290 fn try_decode_table_provider(
291 &self,
292 _buf: &[u8],
293 _table_ref: &TableReference,
294 _schema: arrow::datatypes::SchemaRef,
295 _ctx: &SessionContext,
296 ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
297 not_impl_err!("Method not implemented")
298 }
299
300 fn try_encode_table_provider(
301 &self,
302 _table_ref: &TableReference,
303 _node: Arc<dyn datafusion::datasource::TableProvider>,
304 _buf: &mut Vec<u8>,
305 ) -> datafusion_common::Result<()> {
306 not_impl_err!("Method not implemented")
307 }
308
309 fn try_decode_file_format(
310 &self,
311 buf: &[u8],
312 _ctx: &SessionContext,
313 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
314 let proto = JsonOptionsProto::decode(buf).map_err(|e| {
315 DataFusionError::Execution(format!(
316 "Failed to decode JsonOptionsProto: {e:?}"
317 ))
318 })?;
319 let options: JsonOptions = (&proto).into();
320 Ok(Arc::new(JsonFormatFactory {
321 options: Some(options),
322 }))
323 }
324
325 fn try_encode_file_format(
326 &self,
327 buf: &mut Vec<u8>,
328 node: Arc<dyn FileFormatFactory>,
329 ) -> datafusion_common::Result<()> {
330 let options = if let Some(json_factory) =
331 node.as_any().downcast_ref::<JsonFormatFactory>()
332 {
333 json_factory.options.clone().unwrap_or_default()
334 } else {
335 return Err(DataFusionError::Execution(
336 "Unsupported FileFormatFactory type".to_string(),
337 ));
338 };
339
340 let proto = JsonOptionsProto::from_factory(&JsonFormatFactory {
341 options: Some(options),
342 });
343
344 proto.encode(buf).map_err(|e| {
345 DataFusionError::Execution(format!("Failed to encode JsonOptions: {e:?}"))
346 })?;
347
348 Ok(())
349 }
350}
351
352impl TableParquetOptionsProto {
353 fn from_factory(factory: &ParquetFormatFactory) -> Self {
354 let global_options = if let Some(ref options) = factory.options {
355 options.clone()
356 } else {
357 return TableParquetOptionsProto::default();
358 };
359
360 let column_specific_options = global_options.column_specific_options;
361 #[allow(deprecated)] TableParquetOptionsProto {
363 global: Some(ParquetOptionsProto {
364 enable_page_index: global_options.global.enable_page_index,
365 pruning: global_options.global.pruning,
366 skip_metadata: global_options.global.skip_metadata,
367 metadata_size_hint_opt: global_options.global.metadata_size_hint.map(|size| {
368 parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size as u64)
369 }),
370 pushdown_filters: global_options.global.pushdown_filters,
371 reorder_filters: global_options.global.reorder_filters,
372 data_pagesize_limit: global_options.global.data_pagesize_limit as u64,
373 write_batch_size: global_options.global.write_batch_size as u64,
374 writer_version: global_options.global.writer_version.clone(),
375 compression_opt: global_options.global.compression.map(|compression| {
376 parquet_options::CompressionOpt::Compression(compression)
377 }),
378 dictionary_enabled_opt: global_options.global.dictionary_enabled.map(|enabled| {
379 parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
380 }),
381 dictionary_page_size_limit: global_options.global.dictionary_page_size_limit as u64,
382 statistics_enabled_opt: global_options.global.statistics_enabled.map(|enabled| {
383 parquet_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
384 }),
385 max_statistics_size_opt: global_options.global.max_statistics_size.map(|size| {
386 parquet_options::MaxStatisticsSizeOpt::MaxStatisticsSize(size as u64)
387 }),
388 max_row_group_size: global_options.global.max_row_group_size as u64,
389 created_by: global_options.global.created_by.clone(),
390 column_index_truncate_length_opt: global_options.global.column_index_truncate_length.map(|length| {
391 parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length as u64)
392 }),
393 statistics_truncate_length_opt: global_options.global.statistics_truncate_length.map(|length| {
394 parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length as u64)
395 }),
396 data_page_row_count_limit: global_options.global.data_page_row_count_limit as u64,
397 encoding_opt: global_options.global.encoding.map(|encoding| {
398 parquet_options::EncodingOpt::Encoding(encoding)
399 }),
400 bloom_filter_on_read: global_options.global.bloom_filter_on_read,
401 bloom_filter_on_write: global_options.global.bloom_filter_on_write,
402 bloom_filter_fpp_opt: global_options.global.bloom_filter_fpp.map(|fpp| {
403 parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
404 }),
405 bloom_filter_ndv_opt: global_options.global.bloom_filter_ndv.map(|ndv| {
406 parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
407 }),
408 allow_single_file_parallelism: global_options.global.allow_single_file_parallelism,
409 maximum_parallel_row_group_writers: global_options.global.maximum_parallel_row_group_writers as u64,
410 maximum_buffered_record_batches_per_stream: global_options.global.maximum_buffered_record_batches_per_stream as u64,
411 schema_force_view_types: global_options.global.schema_force_view_types,
412 binary_as_string: global_options.global.binary_as_string,
413 skip_arrow_metadata: global_options.global.skip_arrow_metadata,
414 coerce_int96_opt: global_options.global.coerce_int96.map(|compression| {
415 parquet_options::CoerceInt96Opt::CoerceInt96(compression)
416 }),
417 }),
418 column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| {
419 ParquetColumnSpecificOptions {
420 column_name,
421 options: Some(ParquetColumnOptionsProto {
422 bloom_filter_enabled_opt: options.bloom_filter_enabled.map(|enabled| {
423 parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(enabled)
424 }),
425 encoding_opt: options.encoding.map(|encoding| {
426 parquet_column_options::EncodingOpt::Encoding(encoding)
427 }),
428 dictionary_enabled_opt: options.dictionary_enabled.map(|enabled| {
429 parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
430 }),
431 compression_opt: options.compression.map(|compression| {
432 parquet_column_options::CompressionOpt::Compression(compression)
433 }),
434 statistics_enabled_opt: options.statistics_enabled.map(|enabled| {
435 parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
436 }),
437 bloom_filter_fpp_opt: options.bloom_filter_fpp.map(|fpp| {
438 parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
439 }),
440 bloom_filter_ndv_opt: options.bloom_filter_ndv.map(|ndv| {
441 parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
442 }),
443 max_statistics_size_opt: options.max_statistics_size.map(|size| {
444 parquet_column_options::MaxStatisticsSizeOpt::MaxStatisticsSize(size as u32)
445 }),
446 })
447 }
448 }).collect(),
449 key_value_metadata: global_options.key_value_metadata
450 .iter()
451 .filter_map(|(key, value)| {
452 value.as_ref().map(|v| (key.clone(), v.clone()))
453 })
454 .collect(),
455 }
456 }
457}
458
459impl From<&ParquetOptionsProto> for ParquetOptions {
460 fn from(proto: &ParquetOptionsProto) -> Self {
461 #[allow(deprecated)] ParquetOptions {
463 enable_page_index: proto.enable_page_index,
464 pruning: proto.pruning,
465 skip_metadata: proto.skip_metadata,
466 metadata_size_hint: proto.metadata_size_hint_opt.as_ref().map(|opt| match opt {
467 parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size) => *size as usize,
468 }),
469 pushdown_filters: proto.pushdown_filters,
470 reorder_filters: proto.reorder_filters,
471 data_pagesize_limit: proto.data_pagesize_limit as usize,
472 write_batch_size: proto.write_batch_size as usize,
473 writer_version: proto.writer_version.clone(),
474 compression: proto.compression_opt.as_ref().map(|opt| match opt {
475 parquet_options::CompressionOpt::Compression(compression) => compression.clone(),
476 }),
477 dictionary_enabled: proto.dictionary_enabled_opt.as_ref().map(|opt| match opt {
478 parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled) => *enabled,
479 }),
480 dictionary_page_size_limit: proto.dictionary_page_size_limit as usize,
481 statistics_enabled: proto.statistics_enabled_opt.as_ref().map(|opt| match opt {
482 parquet_options::StatisticsEnabledOpt::StatisticsEnabled(statistics) => statistics.clone(),
483 }),
484 max_statistics_size: proto.max_statistics_size_opt.as_ref().map(|opt| match opt {
485 parquet_options::MaxStatisticsSizeOpt::MaxStatisticsSize(size) => *size as usize,
486 }),
487 max_row_group_size: proto.max_row_group_size as usize,
488 created_by: proto.created_by.clone(),
489 column_index_truncate_length: proto.column_index_truncate_length_opt.as_ref().map(|opt| match opt {
490 parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length) => *length as usize,
491 }),
492 statistics_truncate_length: proto.statistics_truncate_length_opt.as_ref().map(|opt| match opt {
493 parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length) => *length as usize,
494 }),
495 data_page_row_count_limit: proto.data_page_row_count_limit as usize,
496 encoding: proto.encoding_opt.as_ref().map(|opt| match opt {
497 parquet_options::EncodingOpt::Encoding(encoding) => encoding.clone(),
498 }),
499 bloom_filter_on_read: proto.bloom_filter_on_read,
500 bloom_filter_on_write: proto.bloom_filter_on_write,
501 bloom_filter_fpp: proto.bloom_filter_fpp_opt.as_ref().map(|opt| match opt {
502 parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp) => *fpp,
503 }),
504 bloom_filter_ndv: proto.bloom_filter_ndv_opt.as_ref().map(|opt| match opt {
505 parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv) => *ndv,
506 }),
507 allow_single_file_parallelism: proto.allow_single_file_parallelism,
508 maximum_parallel_row_group_writers: proto.maximum_parallel_row_group_writers as usize,
509 maximum_buffered_record_batches_per_stream: proto.maximum_buffered_record_batches_per_stream as usize,
510 schema_force_view_types: proto.schema_force_view_types,
511 binary_as_string: proto.binary_as_string,
512 skip_arrow_metadata: proto.skip_arrow_metadata,
513 coerce_int96: proto.coerce_int96_opt.as_ref().map(|opt| match opt {
514 parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => coerce_int96.clone(),
515 }),
516 }
517 }
518}
519
520impl From<ParquetColumnOptionsProto> for ParquetColumnOptions {
521 fn from(proto: ParquetColumnOptionsProto) -> Self {
522 #[allow(deprecated)] ParquetColumnOptions {
524 bloom_filter_enabled: proto.bloom_filter_enabled_opt.map(
525 |parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v)| v,
526 ),
527 encoding: proto
528 .encoding_opt
529 .map(|parquet_column_options::EncodingOpt::Encoding(v)| v),
530 dictionary_enabled: proto.dictionary_enabled_opt.map(
531 |parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(v)| v,
532 ),
533 compression: proto
534 .compression_opt
535 .map(|parquet_column_options::CompressionOpt::Compression(v)| v),
536 statistics_enabled: proto.statistics_enabled_opt.map(
537 |parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(v)| v,
538 ),
539 bloom_filter_fpp: proto
540 .bloom_filter_fpp_opt
541 .map(|parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v)| v),
542 bloom_filter_ndv: proto
543 .bloom_filter_ndv_opt
544 .map(|parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v)| v),
545 max_statistics_size: proto.max_statistics_size_opt.map(
546 |parquet_column_options::MaxStatisticsSizeOpt::MaxStatisticsSize(v)| {
547 v as usize
548 },
549 ),
550 }
551 }
552}
553
554impl From<&TableParquetOptionsProto> for TableParquetOptions {
555 fn from(proto: &TableParquetOptionsProto) -> Self {
556 TableParquetOptions {
557 global: proto
558 .global
559 .as_ref()
560 .map(ParquetOptions::from)
561 .unwrap_or_default(),
562 column_specific_options: proto
563 .column_specific_options
564 .iter()
565 .map(|parquet_column_options| {
566 (
567 parquet_column_options.column_name.clone(),
568 ParquetColumnOptions::from(
569 parquet_column_options.options.clone().unwrap_or_default(),
570 ),
571 )
572 })
573 .collect(),
574 key_value_metadata: proto
575 .key_value_metadata
576 .iter()
577 .map(|(k, v)| (k.clone(), Some(v.clone())))
578 .collect(),
579 crypto: Default::default(),
580 }
581 }
582}
583
584#[derive(Debug)]
585pub struct ParquetLogicalExtensionCodec;
586
587impl LogicalExtensionCodec for ParquetLogicalExtensionCodec {
589 fn try_decode(
590 &self,
591 _buf: &[u8],
592 _inputs: &[datafusion_expr::LogicalPlan],
593 _ctx: &SessionContext,
594 ) -> datafusion_common::Result<datafusion_expr::Extension> {
595 not_impl_err!("Method not implemented")
596 }
597
598 fn try_encode(
599 &self,
600 _node: &datafusion_expr::Extension,
601 _buf: &mut Vec<u8>,
602 ) -> datafusion_common::Result<()> {
603 not_impl_err!("Method not implemented")
604 }
605
606 fn try_decode_table_provider(
607 &self,
608 _buf: &[u8],
609 _table_ref: &TableReference,
610 _schema: arrow::datatypes::SchemaRef,
611 _ctx: &SessionContext,
612 ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
613 not_impl_err!("Method not implemented")
614 }
615
616 fn try_encode_table_provider(
617 &self,
618 _table_ref: &TableReference,
619 _node: Arc<dyn datafusion::datasource::TableProvider>,
620 _buf: &mut Vec<u8>,
621 ) -> datafusion_common::Result<()> {
622 not_impl_err!("Method not implemented")
623 }
624
625 fn try_decode_file_format(
626 &self,
627 buf: &[u8],
628 _ctx: &SessionContext,
629 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
630 let proto = TableParquetOptionsProto::decode(buf).map_err(|e| {
631 DataFusionError::Execution(format!(
632 "Failed to decode TableParquetOptionsProto: {e:?}"
633 ))
634 })?;
635 let options: TableParquetOptions = (&proto).into();
636 Ok(Arc::new(ParquetFormatFactory {
637 options: Some(options),
638 }))
639 }
640
641 fn try_encode_file_format(
642 &self,
643 buf: &mut Vec<u8>,
644 node: Arc<dyn FileFormatFactory>,
645 ) -> datafusion_common::Result<()> {
646 let options = if let Some(parquet_factory) =
647 node.as_any().downcast_ref::<ParquetFormatFactory>()
648 {
649 parquet_factory.options.clone().unwrap_or_default()
650 } else {
651 return Err(DataFusionError::Execution(
652 "Unsupported FileFormatFactory type".to_string(),
653 ));
654 };
655
656 let proto = TableParquetOptionsProto::from_factory(&ParquetFormatFactory {
657 options: Some(options),
658 });
659
660 proto.encode(buf).map_err(|e| {
661 DataFusionError::Execution(format!(
662 "Failed to encode TableParquetOptionsProto: {e:?}"
663 ))
664 })?;
665
666 Ok(())
667 }
668}
669
670#[derive(Debug)]
671pub struct ArrowLogicalExtensionCodec;
672
673impl LogicalExtensionCodec for ArrowLogicalExtensionCodec {
675 fn try_decode(
676 &self,
677 _buf: &[u8],
678 _inputs: &[datafusion_expr::LogicalPlan],
679 _ctx: &SessionContext,
680 ) -> datafusion_common::Result<datafusion_expr::Extension> {
681 not_impl_err!("Method not implemented")
682 }
683
684 fn try_encode(
685 &self,
686 _node: &datafusion_expr::Extension,
687 _buf: &mut Vec<u8>,
688 ) -> datafusion_common::Result<()> {
689 not_impl_err!("Method not implemented")
690 }
691
692 fn try_decode_table_provider(
693 &self,
694 _buf: &[u8],
695 _table_ref: &TableReference,
696 _schema: arrow::datatypes::SchemaRef,
697 _ctx: &SessionContext,
698 ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
699 not_impl_err!("Method not implemented")
700 }
701
702 fn try_encode_table_provider(
703 &self,
704 _table_ref: &TableReference,
705 _node: Arc<dyn datafusion::datasource::TableProvider>,
706 _buf: &mut Vec<u8>,
707 ) -> datafusion_common::Result<()> {
708 not_impl_err!("Method not implemented")
709 }
710
711 fn try_decode_file_format(
712 &self,
713 __buf: &[u8],
714 __ctx: &SessionContext,
715 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
716 Ok(Arc::new(ArrowFormatFactory::new()))
717 }
718
719 fn try_encode_file_format(
720 &self,
721 __buf: &mut Vec<u8>,
722 __node: Arc<dyn FileFormatFactory>,
723 ) -> datafusion_common::Result<()> {
724 Ok(())
725 }
726}
727
728#[derive(Debug)]
729pub struct AvroLogicalExtensionCodec;
730
731impl LogicalExtensionCodec for AvroLogicalExtensionCodec {
733 fn try_decode(
734 &self,
735 _buf: &[u8],
736 _inputs: &[datafusion_expr::LogicalPlan],
737 _ctx: &SessionContext,
738 ) -> datafusion_common::Result<datafusion_expr::Extension> {
739 not_impl_err!("Method not implemented")
740 }
741
742 fn try_encode(
743 &self,
744 _node: &datafusion_expr::Extension,
745 _buf: &mut Vec<u8>,
746 ) -> datafusion_common::Result<()> {
747 not_impl_err!("Method not implemented")
748 }
749
750 fn try_decode_table_provider(
751 &self,
752 _buf: &[u8],
753 _table_ref: &TableReference,
754 _schema: arrow::datatypes::SchemaRef,
755 _cts: &SessionContext,
756 ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
757 not_impl_err!("Method not implemented")
758 }
759
760 fn try_encode_table_provider(
761 &self,
762 _table_ref: &TableReference,
763 _node: Arc<dyn datafusion::datasource::TableProvider>,
764 _buf: &mut Vec<u8>,
765 ) -> datafusion_common::Result<()> {
766 not_impl_err!("Method not implemented")
767 }
768
769 fn try_decode_file_format(
770 &self,
771 __buf: &[u8],
772 __ctx: &SessionContext,
773 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
774 Ok(Arc::new(ArrowFormatFactory::new()))
775 }
776
777 fn try_encode_file_format(
778 &self,
779 __buf: &mut Vec<u8>,
780 __node: Arc<dyn FileFormatFactory>,
781 ) -> datafusion_common::Result<()> {
782 Ok(())
783 }
784}