datafusion_proto/logical_plan/
file_formats.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use datafusion::{
21    config::{
22        CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
23        TableParquetOptions,
24    },
25    datasource::file_format::{
26        arrow::ArrowFormatFactory, csv::CsvFormatFactory, json::JsonFormatFactory,
27        parquet::ParquetFormatFactory, FileFormatFactory,
28    },
29    prelude::SessionContext,
30};
31use datafusion_common::{
32    exec_err, not_impl_err, parsers::CompressionTypeVariant, DataFusionError,
33    TableReference,
34};
35use prost::Message;
36
37use crate::protobuf::{
38    parquet_column_options, parquet_options, CsvOptions as CsvOptionsProto,
39    JsonOptions as JsonOptionsProto, ParquetColumnOptions as ParquetColumnOptionsProto,
40    ParquetColumnSpecificOptions, ParquetOptions as ParquetOptionsProto,
41    TableParquetOptions as TableParquetOptionsProto,
42};
43
44use super::LogicalExtensionCodec;
45
46#[derive(Debug)]
47pub struct CsvLogicalExtensionCodec;
48
49impl CsvOptionsProto {
50    fn from_factory(factory: &CsvFormatFactory) -> Self {
51        if let Some(options) = &factory.options {
52            CsvOptionsProto {
53                has_header: options.has_header.map_or(vec![], |v| vec![v as u8]),
54                delimiter: vec![options.delimiter],
55                quote: vec![options.quote],
56                terminator: options.terminator.map_or(vec![], |v| vec![v]),
57                escape: options.escape.map_or(vec![], |v| vec![v]),
58                double_quote: options.double_quote.map_or(vec![], |v| vec![v as u8]),
59                compression: options.compression as i32,
60                schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
61                date_format: options.date_format.clone().unwrap_or_default(),
62                datetime_format: options.datetime_format.clone().unwrap_or_default(),
63                timestamp_format: options.timestamp_format.clone().unwrap_or_default(),
64                timestamp_tz_format: options
65                    .timestamp_tz_format
66                    .clone()
67                    .unwrap_or_default(),
68                time_format: options.time_format.clone().unwrap_or_default(),
69                null_value: options.null_value.clone().unwrap_or_default(),
70                null_regex: options.null_regex.clone().unwrap_or_default(),
71                comment: options.comment.map_or(vec![], |v| vec![v]),
72                newlines_in_values: options
73                    .newlines_in_values
74                    .map_or(vec![], |v| vec![v as u8]),
75            }
76        } else {
77            CsvOptionsProto::default()
78        }
79    }
80}
81
82impl From<&CsvOptionsProto> for CsvOptions {
83    fn from(proto: &CsvOptionsProto) -> Self {
84        CsvOptions {
85            has_header: if !proto.has_header.is_empty() {
86                Some(proto.has_header[0] != 0)
87            } else {
88                None
89            },
90            delimiter: proto.delimiter.first().copied().unwrap_or(b','),
91            quote: proto.quote.first().copied().unwrap_or(b'"'),
92            terminator: if !proto.terminator.is_empty() {
93                Some(proto.terminator[0])
94            } else {
95                None
96            },
97            escape: if !proto.escape.is_empty() {
98                Some(proto.escape[0])
99            } else {
100                None
101            },
102            double_quote: if !proto.double_quote.is_empty() {
103                Some(proto.double_quote[0] != 0)
104            } else {
105                None
106            },
107            compression: match proto.compression {
108                0 => CompressionTypeVariant::GZIP,
109                1 => CompressionTypeVariant::BZIP2,
110                2 => CompressionTypeVariant::XZ,
111                3 => CompressionTypeVariant::ZSTD,
112                _ => CompressionTypeVariant::UNCOMPRESSED,
113            },
114            schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
115            date_format: if proto.date_format.is_empty() {
116                None
117            } else {
118                Some(proto.date_format.clone())
119            },
120            datetime_format: if proto.datetime_format.is_empty() {
121                None
122            } else {
123                Some(proto.datetime_format.clone())
124            },
125            timestamp_format: if proto.timestamp_format.is_empty() {
126                None
127            } else {
128                Some(proto.timestamp_format.clone())
129            },
130            timestamp_tz_format: if proto.timestamp_tz_format.is_empty() {
131                None
132            } else {
133                Some(proto.timestamp_tz_format.clone())
134            },
135            time_format: if proto.time_format.is_empty() {
136                None
137            } else {
138                Some(proto.time_format.clone())
139            },
140            null_value: if proto.null_value.is_empty() {
141                None
142            } else {
143                Some(proto.null_value.clone())
144            },
145            null_regex: if proto.null_regex.is_empty() {
146                None
147            } else {
148                Some(proto.null_regex.clone())
149            },
150            comment: if !proto.comment.is_empty() {
151                Some(proto.comment[0])
152            } else {
153                None
154            },
155            newlines_in_values: if proto.newlines_in_values.is_empty() {
156                None
157            } else {
158                Some(proto.newlines_in_values[0] != 0)
159            },
160        }
161    }
162}
163
164// TODO! This is a placeholder for now and needs to be implemented for real.
165impl LogicalExtensionCodec for CsvLogicalExtensionCodec {
166    fn try_decode(
167        &self,
168        _buf: &[u8],
169        _inputs: &[datafusion_expr::LogicalPlan],
170        _ctx: &SessionContext,
171    ) -> datafusion_common::Result<datafusion_expr::Extension> {
172        not_impl_err!("Method not implemented")
173    }
174
175    fn try_encode(
176        &self,
177        _node: &datafusion_expr::Extension,
178        _buf: &mut Vec<u8>,
179    ) -> datafusion_common::Result<()> {
180        not_impl_err!("Method not implemented")
181    }
182
183    fn try_decode_table_provider(
184        &self,
185        _buf: &[u8],
186        _table_ref: &TableReference,
187        _schema: arrow::datatypes::SchemaRef,
188        _ctx: &SessionContext,
189    ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
190        not_impl_err!("Method not implemented")
191    }
192
193    fn try_encode_table_provider(
194        &self,
195        _table_ref: &TableReference,
196        _node: Arc<dyn datafusion::datasource::TableProvider>,
197        _buf: &mut Vec<u8>,
198    ) -> datafusion_common::Result<()> {
199        not_impl_err!("Method not implemented")
200    }
201
202    fn try_decode_file_format(
203        &self,
204        buf: &[u8],
205        _ctx: &SessionContext,
206    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
207        let proto = CsvOptionsProto::decode(buf).map_err(|e| {
208            DataFusionError::Execution(format!("Failed to decode CsvOptionsProto: {e:?}"))
209        })?;
210        let options: CsvOptions = (&proto).into();
211        Ok(Arc::new(CsvFormatFactory {
212            options: Some(options),
213        }))
214    }
215
216    fn try_encode_file_format(
217        &self,
218        buf: &mut Vec<u8>,
219        node: Arc<dyn FileFormatFactory>,
220    ) -> datafusion_common::Result<()> {
221        let options =
222            if let Some(csv_factory) = node.as_any().downcast_ref::<CsvFormatFactory>() {
223                csv_factory.options.clone().unwrap_or_default()
224            } else {
225                return exec_err!("{}", "Unsupported FileFormatFactory type".to_string());
226            };
227
228        let proto = CsvOptionsProto::from_factory(&CsvFormatFactory {
229            options: Some(options),
230        });
231
232        proto.encode(buf).map_err(|e| {
233            DataFusionError::Execution(format!("Failed to encode CsvOptions: {e:?}"))
234        })?;
235
236        Ok(())
237    }
238}
239
240impl JsonOptionsProto {
241    fn from_factory(factory: &JsonFormatFactory) -> Self {
242        if let Some(options) = &factory.options {
243            JsonOptionsProto {
244                compression: options.compression as i32,
245                schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
246            }
247        } else {
248            JsonOptionsProto::default()
249        }
250    }
251}
252
253impl From<&JsonOptionsProto> for JsonOptions {
254    fn from(proto: &JsonOptionsProto) -> Self {
255        JsonOptions {
256            compression: match proto.compression {
257                0 => CompressionTypeVariant::GZIP,
258                1 => CompressionTypeVariant::BZIP2,
259                2 => CompressionTypeVariant::XZ,
260                3 => CompressionTypeVariant::ZSTD,
261                _ => CompressionTypeVariant::UNCOMPRESSED,
262            },
263            schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
264        }
265    }
266}
267
268#[derive(Debug)]
269pub struct JsonLogicalExtensionCodec;
270
271// TODO! This is a placeholder for now and needs to be implemented for real.
272impl LogicalExtensionCodec for JsonLogicalExtensionCodec {
273    fn try_decode(
274        &self,
275        _buf: &[u8],
276        _inputs: &[datafusion_expr::LogicalPlan],
277        _ctx: &SessionContext,
278    ) -> datafusion_common::Result<datafusion_expr::Extension> {
279        not_impl_err!("Method not implemented")
280    }
281
282    fn try_encode(
283        &self,
284        _node: &datafusion_expr::Extension,
285        _buf: &mut Vec<u8>,
286    ) -> datafusion_common::Result<()> {
287        not_impl_err!("Method not implemented")
288    }
289
290    fn try_decode_table_provider(
291        &self,
292        _buf: &[u8],
293        _table_ref: &TableReference,
294        _schema: arrow::datatypes::SchemaRef,
295        _ctx: &SessionContext,
296    ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
297        not_impl_err!("Method not implemented")
298    }
299
300    fn try_encode_table_provider(
301        &self,
302        _table_ref: &TableReference,
303        _node: Arc<dyn datafusion::datasource::TableProvider>,
304        _buf: &mut Vec<u8>,
305    ) -> datafusion_common::Result<()> {
306        not_impl_err!("Method not implemented")
307    }
308
309    fn try_decode_file_format(
310        &self,
311        buf: &[u8],
312        _ctx: &SessionContext,
313    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
314        let proto = JsonOptionsProto::decode(buf).map_err(|e| {
315            DataFusionError::Execution(format!(
316                "Failed to decode JsonOptionsProto: {e:?}"
317            ))
318        })?;
319        let options: JsonOptions = (&proto).into();
320        Ok(Arc::new(JsonFormatFactory {
321            options: Some(options),
322        }))
323    }
324
325    fn try_encode_file_format(
326        &self,
327        buf: &mut Vec<u8>,
328        node: Arc<dyn FileFormatFactory>,
329    ) -> datafusion_common::Result<()> {
330        let options = if let Some(json_factory) =
331            node.as_any().downcast_ref::<JsonFormatFactory>()
332        {
333            json_factory.options.clone().unwrap_or_default()
334        } else {
335            return Err(DataFusionError::Execution(
336                "Unsupported FileFormatFactory type".to_string(),
337            ));
338        };
339
340        let proto = JsonOptionsProto::from_factory(&JsonFormatFactory {
341            options: Some(options),
342        });
343
344        proto.encode(buf).map_err(|e| {
345            DataFusionError::Execution(format!("Failed to encode JsonOptions: {e:?}"))
346        })?;
347
348        Ok(())
349    }
350}
351
352impl TableParquetOptionsProto {
353    fn from_factory(factory: &ParquetFormatFactory) -> Self {
354        let global_options = if let Some(ref options) = factory.options {
355            options.clone()
356        } else {
357            return TableParquetOptionsProto::default();
358        };
359
360        let column_specific_options = global_options.column_specific_options;
361        #[allow(deprecated)] // max_statistics_size
362        TableParquetOptionsProto {
363            global: Some(ParquetOptionsProto {
364                enable_page_index: global_options.global.enable_page_index,
365                pruning: global_options.global.pruning,
366                skip_metadata: global_options.global.skip_metadata,
367                metadata_size_hint_opt: global_options.global.metadata_size_hint.map(|size| {
368                    parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size as u64)
369                }),
370                pushdown_filters: global_options.global.pushdown_filters,
371                reorder_filters: global_options.global.reorder_filters,
372                data_pagesize_limit: global_options.global.data_pagesize_limit as u64,
373                write_batch_size: global_options.global.write_batch_size as u64,
374                writer_version: global_options.global.writer_version.clone(),
375                compression_opt: global_options.global.compression.map(|compression| {
376                    parquet_options::CompressionOpt::Compression(compression)
377                }),
378                dictionary_enabled_opt: global_options.global.dictionary_enabled.map(|enabled| {
379                    parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
380                }),
381                dictionary_page_size_limit: global_options.global.dictionary_page_size_limit as u64,
382                statistics_enabled_opt: global_options.global.statistics_enabled.map(|enabled| {
383                    parquet_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
384                }),
385                max_statistics_size_opt: global_options.global.max_statistics_size.map(|size| {
386                    parquet_options::MaxStatisticsSizeOpt::MaxStatisticsSize(size as u64)
387                }),
388                max_row_group_size: global_options.global.max_row_group_size as u64,
389                created_by: global_options.global.created_by.clone(),
390                column_index_truncate_length_opt: global_options.global.column_index_truncate_length.map(|length| {
391                    parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length as u64)
392                }),
393                statistics_truncate_length_opt: global_options.global.statistics_truncate_length.map(|length| {
394                    parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length as u64)
395                }),
396                data_page_row_count_limit: global_options.global.data_page_row_count_limit as u64,
397                encoding_opt: global_options.global.encoding.map(|encoding| {
398                    parquet_options::EncodingOpt::Encoding(encoding)
399                }),
400                bloom_filter_on_read: global_options.global.bloom_filter_on_read,
401                bloom_filter_on_write: global_options.global.bloom_filter_on_write,
402                bloom_filter_fpp_opt: global_options.global.bloom_filter_fpp.map(|fpp| {
403                    parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
404                }),
405                bloom_filter_ndv_opt: global_options.global.bloom_filter_ndv.map(|ndv| {
406                    parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
407                }),
408                allow_single_file_parallelism: global_options.global.allow_single_file_parallelism,
409                maximum_parallel_row_group_writers: global_options.global.maximum_parallel_row_group_writers as u64,
410                maximum_buffered_record_batches_per_stream: global_options.global.maximum_buffered_record_batches_per_stream as u64,
411                schema_force_view_types: global_options.global.schema_force_view_types,
412                binary_as_string: global_options.global.binary_as_string,
413                skip_arrow_metadata: global_options.global.skip_arrow_metadata,
414                coerce_int96_opt: global_options.global.coerce_int96.map(|compression| {
415                    parquet_options::CoerceInt96Opt::CoerceInt96(compression)
416                }),
417            }),
418            column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| {
419                ParquetColumnSpecificOptions {
420                    column_name,
421                    options: Some(ParquetColumnOptionsProto {
422                        bloom_filter_enabled_opt: options.bloom_filter_enabled.map(|enabled| {
423                            parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(enabled)
424                        }),
425                        encoding_opt: options.encoding.map(|encoding| {
426                            parquet_column_options::EncodingOpt::Encoding(encoding)
427                        }),
428                        dictionary_enabled_opt: options.dictionary_enabled.map(|enabled| {
429                            parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
430                        }),
431                        compression_opt: options.compression.map(|compression| {
432                            parquet_column_options::CompressionOpt::Compression(compression)
433                        }),
434                        statistics_enabled_opt: options.statistics_enabled.map(|enabled| {
435                            parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
436                        }),
437                        bloom_filter_fpp_opt: options.bloom_filter_fpp.map(|fpp| {
438                            parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
439                        }),
440                        bloom_filter_ndv_opt: options.bloom_filter_ndv.map(|ndv| {
441                            parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
442                        }),
443                        max_statistics_size_opt: options.max_statistics_size.map(|size| {
444                            parquet_column_options::MaxStatisticsSizeOpt::MaxStatisticsSize(size as u32)
445                        }),
446                    })
447                }
448            }).collect(),
449            key_value_metadata: global_options.key_value_metadata
450                .iter()
451                .filter_map(|(key, value)| {
452                    value.as_ref().map(|v| (key.clone(), v.clone()))
453                })
454                .collect(),
455        }
456    }
457}
458
459impl From<&ParquetOptionsProto> for ParquetOptions {
460    fn from(proto: &ParquetOptionsProto) -> Self {
461        #[allow(deprecated)] // max_statistics_size
462        ParquetOptions {
463            enable_page_index: proto.enable_page_index,
464            pruning: proto.pruning,
465            skip_metadata: proto.skip_metadata,
466            metadata_size_hint: proto.metadata_size_hint_opt.as_ref().map(|opt| match opt {
467                parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size) => *size as usize,
468            }),
469            pushdown_filters: proto.pushdown_filters,
470            reorder_filters: proto.reorder_filters,
471            data_pagesize_limit: proto.data_pagesize_limit as usize,
472            write_batch_size: proto.write_batch_size as usize,
473            writer_version: proto.writer_version.clone(),
474            compression: proto.compression_opt.as_ref().map(|opt| match opt {
475                parquet_options::CompressionOpt::Compression(compression) => compression.clone(),
476            }),
477            dictionary_enabled: proto.dictionary_enabled_opt.as_ref().map(|opt| match opt {
478                parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled) => *enabled,
479            }),
480            dictionary_page_size_limit: proto.dictionary_page_size_limit as usize,
481            statistics_enabled: proto.statistics_enabled_opt.as_ref().map(|opt| match opt {
482                parquet_options::StatisticsEnabledOpt::StatisticsEnabled(statistics) => statistics.clone(),
483            }),
484            max_statistics_size: proto.max_statistics_size_opt.as_ref().map(|opt| match opt {
485                parquet_options::MaxStatisticsSizeOpt::MaxStatisticsSize(size) => *size as usize,
486            }),
487            max_row_group_size: proto.max_row_group_size as usize,
488            created_by: proto.created_by.clone(),
489            column_index_truncate_length: proto.column_index_truncate_length_opt.as_ref().map(|opt| match opt {
490                parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length) => *length as usize,
491            }),
492            statistics_truncate_length: proto.statistics_truncate_length_opt.as_ref().map(|opt| match opt {
493                parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length) => *length as usize,
494            }),
495            data_page_row_count_limit: proto.data_page_row_count_limit as usize,
496            encoding: proto.encoding_opt.as_ref().map(|opt| match opt {
497                parquet_options::EncodingOpt::Encoding(encoding) => encoding.clone(),
498            }),
499            bloom_filter_on_read: proto.bloom_filter_on_read,
500            bloom_filter_on_write: proto.bloom_filter_on_write,
501            bloom_filter_fpp: proto.bloom_filter_fpp_opt.as_ref().map(|opt| match opt {
502                parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp) => *fpp,
503            }),
504            bloom_filter_ndv: proto.bloom_filter_ndv_opt.as_ref().map(|opt| match opt {
505                parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv) => *ndv,
506            }),
507            allow_single_file_parallelism: proto.allow_single_file_parallelism,
508            maximum_parallel_row_group_writers: proto.maximum_parallel_row_group_writers as usize,
509            maximum_buffered_record_batches_per_stream: proto.maximum_buffered_record_batches_per_stream as usize,
510            schema_force_view_types: proto.schema_force_view_types,
511            binary_as_string: proto.binary_as_string,
512            skip_arrow_metadata: proto.skip_arrow_metadata,
513            coerce_int96: proto.coerce_int96_opt.as_ref().map(|opt| match opt {
514                parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => coerce_int96.clone(),
515            }),
516        }
517    }
518}
519
520impl From<ParquetColumnOptionsProto> for ParquetColumnOptions {
521    fn from(proto: ParquetColumnOptionsProto) -> Self {
522        #[allow(deprecated)] // max_statistics_size
523        ParquetColumnOptions {
524            bloom_filter_enabled: proto.bloom_filter_enabled_opt.map(
525                |parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v)| v,
526            ),
527            encoding: proto
528                .encoding_opt
529                .map(|parquet_column_options::EncodingOpt::Encoding(v)| v),
530            dictionary_enabled: proto.dictionary_enabled_opt.map(
531                |parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(v)| v,
532            ),
533            compression: proto
534                .compression_opt
535                .map(|parquet_column_options::CompressionOpt::Compression(v)| v),
536            statistics_enabled: proto.statistics_enabled_opt.map(
537                |parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(v)| v,
538            ),
539            bloom_filter_fpp: proto
540                .bloom_filter_fpp_opt
541                .map(|parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v)| v),
542            bloom_filter_ndv: proto
543                .bloom_filter_ndv_opt
544                .map(|parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v)| v),
545            max_statistics_size: proto.max_statistics_size_opt.map(
546                |parquet_column_options::MaxStatisticsSizeOpt::MaxStatisticsSize(v)| {
547                    v as usize
548                },
549            ),
550        }
551    }
552}
553
554impl From<&TableParquetOptionsProto> for TableParquetOptions {
555    fn from(proto: &TableParquetOptionsProto) -> Self {
556        TableParquetOptions {
557            global: proto
558                .global
559                .as_ref()
560                .map(ParquetOptions::from)
561                .unwrap_or_default(),
562            column_specific_options: proto
563                .column_specific_options
564                .iter()
565                .map(|parquet_column_options| {
566                    (
567                        parquet_column_options.column_name.clone(),
568                        ParquetColumnOptions::from(
569                            parquet_column_options.options.clone().unwrap_or_default(),
570                        ),
571                    )
572                })
573                .collect(),
574            key_value_metadata: proto
575                .key_value_metadata
576                .iter()
577                .map(|(k, v)| (k.clone(), Some(v.clone())))
578                .collect(),
579            crypto: Default::default(),
580        }
581    }
582}
583
584#[derive(Debug)]
585pub struct ParquetLogicalExtensionCodec;
586
587// TODO! This is a placeholder for now and needs to be implemented for real.
588impl LogicalExtensionCodec for ParquetLogicalExtensionCodec {
589    fn try_decode(
590        &self,
591        _buf: &[u8],
592        _inputs: &[datafusion_expr::LogicalPlan],
593        _ctx: &SessionContext,
594    ) -> datafusion_common::Result<datafusion_expr::Extension> {
595        not_impl_err!("Method not implemented")
596    }
597
598    fn try_encode(
599        &self,
600        _node: &datafusion_expr::Extension,
601        _buf: &mut Vec<u8>,
602    ) -> datafusion_common::Result<()> {
603        not_impl_err!("Method not implemented")
604    }
605
606    fn try_decode_table_provider(
607        &self,
608        _buf: &[u8],
609        _table_ref: &TableReference,
610        _schema: arrow::datatypes::SchemaRef,
611        _ctx: &SessionContext,
612    ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
613        not_impl_err!("Method not implemented")
614    }
615
616    fn try_encode_table_provider(
617        &self,
618        _table_ref: &TableReference,
619        _node: Arc<dyn datafusion::datasource::TableProvider>,
620        _buf: &mut Vec<u8>,
621    ) -> datafusion_common::Result<()> {
622        not_impl_err!("Method not implemented")
623    }
624
625    fn try_decode_file_format(
626        &self,
627        buf: &[u8],
628        _ctx: &SessionContext,
629    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
630        let proto = TableParquetOptionsProto::decode(buf).map_err(|e| {
631            DataFusionError::Execution(format!(
632                "Failed to decode TableParquetOptionsProto: {e:?}"
633            ))
634        })?;
635        let options: TableParquetOptions = (&proto).into();
636        Ok(Arc::new(ParquetFormatFactory {
637            options: Some(options),
638        }))
639    }
640
641    fn try_encode_file_format(
642        &self,
643        buf: &mut Vec<u8>,
644        node: Arc<dyn FileFormatFactory>,
645    ) -> datafusion_common::Result<()> {
646        let options = if let Some(parquet_factory) =
647            node.as_any().downcast_ref::<ParquetFormatFactory>()
648        {
649            parquet_factory.options.clone().unwrap_or_default()
650        } else {
651            return Err(DataFusionError::Execution(
652                "Unsupported FileFormatFactory type".to_string(),
653            ));
654        };
655
656        let proto = TableParquetOptionsProto::from_factory(&ParquetFormatFactory {
657            options: Some(options),
658        });
659
660        proto.encode(buf).map_err(|e| {
661            DataFusionError::Execution(format!(
662                "Failed to encode TableParquetOptionsProto: {e:?}"
663            ))
664        })?;
665
666        Ok(())
667    }
668}
669
670#[derive(Debug)]
671pub struct ArrowLogicalExtensionCodec;
672
673// TODO! This is a placeholder for now and needs to be implemented for real.
674impl LogicalExtensionCodec for ArrowLogicalExtensionCodec {
675    fn try_decode(
676        &self,
677        _buf: &[u8],
678        _inputs: &[datafusion_expr::LogicalPlan],
679        _ctx: &SessionContext,
680    ) -> datafusion_common::Result<datafusion_expr::Extension> {
681        not_impl_err!("Method not implemented")
682    }
683
684    fn try_encode(
685        &self,
686        _node: &datafusion_expr::Extension,
687        _buf: &mut Vec<u8>,
688    ) -> datafusion_common::Result<()> {
689        not_impl_err!("Method not implemented")
690    }
691
692    fn try_decode_table_provider(
693        &self,
694        _buf: &[u8],
695        _table_ref: &TableReference,
696        _schema: arrow::datatypes::SchemaRef,
697        _ctx: &SessionContext,
698    ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
699        not_impl_err!("Method not implemented")
700    }
701
702    fn try_encode_table_provider(
703        &self,
704        _table_ref: &TableReference,
705        _node: Arc<dyn datafusion::datasource::TableProvider>,
706        _buf: &mut Vec<u8>,
707    ) -> datafusion_common::Result<()> {
708        not_impl_err!("Method not implemented")
709    }
710
711    fn try_decode_file_format(
712        &self,
713        __buf: &[u8],
714        __ctx: &SessionContext,
715    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
716        Ok(Arc::new(ArrowFormatFactory::new()))
717    }
718
719    fn try_encode_file_format(
720        &self,
721        __buf: &mut Vec<u8>,
722        __node: Arc<dyn FileFormatFactory>,
723    ) -> datafusion_common::Result<()> {
724        Ok(())
725    }
726}
727
728#[derive(Debug)]
729pub struct AvroLogicalExtensionCodec;
730
731// TODO! This is a placeholder for now and needs to be implemented for real.
732impl LogicalExtensionCodec for AvroLogicalExtensionCodec {
733    fn try_decode(
734        &self,
735        _buf: &[u8],
736        _inputs: &[datafusion_expr::LogicalPlan],
737        _ctx: &SessionContext,
738    ) -> datafusion_common::Result<datafusion_expr::Extension> {
739        not_impl_err!("Method not implemented")
740    }
741
742    fn try_encode(
743        &self,
744        _node: &datafusion_expr::Extension,
745        _buf: &mut Vec<u8>,
746    ) -> datafusion_common::Result<()> {
747        not_impl_err!("Method not implemented")
748    }
749
750    fn try_decode_table_provider(
751        &self,
752        _buf: &[u8],
753        _table_ref: &TableReference,
754        _schema: arrow::datatypes::SchemaRef,
755        _cts: &SessionContext,
756    ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
757        not_impl_err!("Method not implemented")
758    }
759
760    fn try_encode_table_provider(
761        &self,
762        _table_ref: &TableReference,
763        _node: Arc<dyn datafusion::datasource::TableProvider>,
764        _buf: &mut Vec<u8>,
765    ) -> datafusion_common::Result<()> {
766        not_impl_err!("Method not implemented")
767    }
768
769    fn try_decode_file_format(
770        &self,
771        __buf: &[u8],
772        __ctx: &SessionContext,
773    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
774        Ok(Arc::new(ArrowFormatFactory::new()))
775    }
776
777    fn try_encode_file_format(
778        &self,
779        __buf: &mut Vec<u8>,
780        __node: Arc<dyn FileFormatFactory>,
781    ) -> datafusion_common::Result<()> {
782        Ok(())
783    }
784}