datafusion_proto/logical_plan/
file_formats.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use datafusion::{
21    config::{
22        CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
23        TableParquetOptions,
24    },
25    datasource::file_format::{
26        arrow::ArrowFormatFactory, csv::CsvFormatFactory, json::JsonFormatFactory,
27        parquet::ParquetFormatFactory, FileFormatFactory,
28    },
29    prelude::SessionContext,
30};
31use datafusion_common::{
32    exec_err, not_impl_err, parsers::CompressionTypeVariant, DataFusionError,
33    TableReference,
34};
35use prost::Message;
36
37use crate::protobuf::{
38    parquet_column_options, parquet_options, CsvOptions as CsvOptionsProto,
39    JsonOptions as JsonOptionsProto, ParquetColumnOptions as ParquetColumnOptionsProto,
40    ParquetColumnSpecificOptions, ParquetOptions as ParquetOptionsProto,
41    TableParquetOptions as TableParquetOptionsProto,
42};
43
44use super::LogicalExtensionCodec;
45
46#[derive(Debug)]
47pub struct CsvLogicalExtensionCodec;
48
49impl CsvOptionsProto {
50    fn from_factory(factory: &CsvFormatFactory) -> Self {
51        if let Some(options) = &factory.options {
52            CsvOptionsProto {
53                has_header: options.has_header.map_or(vec![], |v| vec![v as u8]),
54                delimiter: vec![options.delimiter],
55                quote: vec![options.quote],
56                terminator: options.terminator.map_or(vec![], |v| vec![v]),
57                escape: options.escape.map_or(vec![], |v| vec![v]),
58                double_quote: options.double_quote.map_or(vec![], |v| vec![v as u8]),
59                compression: options.compression as i32,
60                schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
61                date_format: options.date_format.clone().unwrap_or_default(),
62                datetime_format: options.datetime_format.clone().unwrap_or_default(),
63                timestamp_format: options.timestamp_format.clone().unwrap_or_default(),
64                timestamp_tz_format: options
65                    .timestamp_tz_format
66                    .clone()
67                    .unwrap_or_default(),
68                time_format: options.time_format.clone().unwrap_or_default(),
69                null_value: options.null_value.clone().unwrap_or_default(),
70                null_regex: options.null_regex.clone().unwrap_or_default(),
71                comment: options.comment.map_or(vec![], |v| vec![v]),
72                newlines_in_values: options
73                    .newlines_in_values
74                    .map_or(vec![], |v| vec![v as u8]),
75            }
76        } else {
77            CsvOptionsProto::default()
78        }
79    }
80}
81
82impl From<&CsvOptionsProto> for CsvOptions {
83    fn from(proto: &CsvOptionsProto) -> Self {
84        CsvOptions {
85            has_header: if !proto.has_header.is_empty() {
86                Some(proto.has_header[0] != 0)
87            } else {
88                None
89            },
90            delimiter: proto.delimiter.first().copied().unwrap_or(b','),
91            quote: proto.quote.first().copied().unwrap_or(b'"'),
92            terminator: if !proto.terminator.is_empty() {
93                Some(proto.terminator[0])
94            } else {
95                None
96            },
97            escape: if !proto.escape.is_empty() {
98                Some(proto.escape[0])
99            } else {
100                None
101            },
102            double_quote: if !proto.double_quote.is_empty() {
103                Some(proto.double_quote[0] != 0)
104            } else {
105                None
106            },
107            compression: match proto.compression {
108                0 => CompressionTypeVariant::GZIP,
109                1 => CompressionTypeVariant::BZIP2,
110                2 => CompressionTypeVariant::XZ,
111                3 => CompressionTypeVariant::ZSTD,
112                _ => CompressionTypeVariant::UNCOMPRESSED,
113            },
114            schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
115            date_format: if proto.date_format.is_empty() {
116                None
117            } else {
118                Some(proto.date_format.clone())
119            },
120            datetime_format: if proto.datetime_format.is_empty() {
121                None
122            } else {
123                Some(proto.datetime_format.clone())
124            },
125            timestamp_format: if proto.timestamp_format.is_empty() {
126                None
127            } else {
128                Some(proto.timestamp_format.clone())
129            },
130            timestamp_tz_format: if proto.timestamp_tz_format.is_empty() {
131                None
132            } else {
133                Some(proto.timestamp_tz_format.clone())
134            },
135            time_format: if proto.time_format.is_empty() {
136                None
137            } else {
138                Some(proto.time_format.clone())
139            },
140            null_value: if proto.null_value.is_empty() {
141                None
142            } else {
143                Some(proto.null_value.clone())
144            },
145            null_regex: if proto.null_regex.is_empty() {
146                None
147            } else {
148                Some(proto.null_regex.clone())
149            },
150            comment: if !proto.comment.is_empty() {
151                Some(proto.comment[0])
152            } else {
153                None
154            },
155            newlines_in_values: if proto.newlines_in_values.is_empty() {
156                None
157            } else {
158                Some(proto.newlines_in_values[0] != 0)
159            },
160        }
161    }
162}
163
164// TODO! This is a placeholder for now and needs to be implemented for real.
165impl LogicalExtensionCodec for CsvLogicalExtensionCodec {
166    fn try_decode(
167        &self,
168        _buf: &[u8],
169        _inputs: &[datafusion_expr::LogicalPlan],
170        _ctx: &SessionContext,
171    ) -> datafusion_common::Result<datafusion_expr::Extension> {
172        not_impl_err!("Method not implemented")
173    }
174
175    fn try_encode(
176        &self,
177        _node: &datafusion_expr::Extension,
178        _buf: &mut Vec<u8>,
179    ) -> datafusion_common::Result<()> {
180        not_impl_err!("Method not implemented")
181    }
182
183    fn try_decode_table_provider(
184        &self,
185        _buf: &[u8],
186        _table_ref: &TableReference,
187        _schema: arrow::datatypes::SchemaRef,
188        _ctx: &SessionContext,
189    ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
190        not_impl_err!("Method not implemented")
191    }
192
193    fn try_encode_table_provider(
194        &self,
195        _table_ref: &TableReference,
196        _node: Arc<dyn datafusion::datasource::TableProvider>,
197        _buf: &mut Vec<u8>,
198    ) -> datafusion_common::Result<()> {
199        not_impl_err!("Method not implemented")
200    }
201
202    fn try_decode_file_format(
203        &self,
204        buf: &[u8],
205        _ctx: &SessionContext,
206    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
207        let proto = CsvOptionsProto::decode(buf).map_err(|e| {
208            DataFusionError::Execution(format!("Failed to decode CsvOptionsProto: {e:?}"))
209        })?;
210        let options: CsvOptions = (&proto).into();
211        Ok(Arc::new(CsvFormatFactory {
212            options: Some(options),
213        }))
214    }
215
216    fn try_encode_file_format(
217        &self,
218        buf: &mut Vec<u8>,
219        node: Arc<dyn FileFormatFactory>,
220    ) -> datafusion_common::Result<()> {
221        let options =
222            if let Some(csv_factory) = node.as_any().downcast_ref::<CsvFormatFactory>() {
223                csv_factory.options.clone().unwrap_or_default()
224            } else {
225                return exec_err!("{}", "Unsupported FileFormatFactory type".to_string());
226            };
227
228        let proto = CsvOptionsProto::from_factory(&CsvFormatFactory {
229            options: Some(options),
230        });
231
232        proto.encode(buf).map_err(|e| {
233            DataFusionError::Execution(format!("Failed to encode CsvOptions: {e:?}"))
234        })?;
235
236        Ok(())
237    }
238}
239
240impl JsonOptionsProto {
241    fn from_factory(factory: &JsonFormatFactory) -> Self {
242        if let Some(options) = &factory.options {
243            JsonOptionsProto {
244                compression: options.compression as i32,
245                schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
246            }
247        } else {
248            JsonOptionsProto::default()
249        }
250    }
251}
252
253impl From<&JsonOptionsProto> for JsonOptions {
254    fn from(proto: &JsonOptionsProto) -> Self {
255        JsonOptions {
256            compression: match proto.compression {
257                0 => CompressionTypeVariant::GZIP,
258                1 => CompressionTypeVariant::BZIP2,
259                2 => CompressionTypeVariant::XZ,
260                3 => CompressionTypeVariant::ZSTD,
261                _ => CompressionTypeVariant::UNCOMPRESSED,
262            },
263            schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
264        }
265    }
266}
267
268#[derive(Debug)]
269pub struct JsonLogicalExtensionCodec;
270
271// TODO! This is a placeholder for now and needs to be implemented for real.
272impl LogicalExtensionCodec for JsonLogicalExtensionCodec {
273    fn try_decode(
274        &self,
275        _buf: &[u8],
276        _inputs: &[datafusion_expr::LogicalPlan],
277        _ctx: &SessionContext,
278    ) -> datafusion_common::Result<datafusion_expr::Extension> {
279        not_impl_err!("Method not implemented")
280    }
281
282    fn try_encode(
283        &self,
284        _node: &datafusion_expr::Extension,
285        _buf: &mut Vec<u8>,
286    ) -> datafusion_common::Result<()> {
287        not_impl_err!("Method not implemented")
288    }
289
290    fn try_decode_table_provider(
291        &self,
292        _buf: &[u8],
293        _table_ref: &TableReference,
294        _schema: arrow::datatypes::SchemaRef,
295        _ctx: &SessionContext,
296    ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
297        not_impl_err!("Method not implemented")
298    }
299
300    fn try_encode_table_provider(
301        &self,
302        _table_ref: &TableReference,
303        _node: Arc<dyn datafusion::datasource::TableProvider>,
304        _buf: &mut Vec<u8>,
305    ) -> datafusion_common::Result<()> {
306        not_impl_err!("Method not implemented")
307    }
308
309    fn try_decode_file_format(
310        &self,
311        buf: &[u8],
312        _ctx: &SessionContext,
313    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
314        let proto = JsonOptionsProto::decode(buf).map_err(|e| {
315            DataFusionError::Execution(format!(
316                "Failed to decode JsonOptionsProto: {e:?}"
317            ))
318        })?;
319        let options: JsonOptions = (&proto).into();
320        Ok(Arc::new(JsonFormatFactory {
321            options: Some(options),
322        }))
323    }
324
325    fn try_encode_file_format(
326        &self,
327        buf: &mut Vec<u8>,
328        node: Arc<dyn FileFormatFactory>,
329    ) -> datafusion_common::Result<()> {
330        let options = if let Some(json_factory) =
331            node.as_any().downcast_ref::<JsonFormatFactory>()
332        {
333            json_factory.options.clone().unwrap_or_default()
334        } else {
335            return Err(DataFusionError::Execution(
336                "Unsupported FileFormatFactory type".to_string(),
337            ));
338        };
339
340        let proto = JsonOptionsProto::from_factory(&JsonFormatFactory {
341            options: Some(options),
342        });
343
344        proto.encode(buf).map_err(|e| {
345            DataFusionError::Execution(format!("Failed to encode JsonOptions: {e:?}"))
346        })?;
347
348        Ok(())
349    }
350}
351
352impl TableParquetOptionsProto {
353    fn from_factory(factory: &ParquetFormatFactory) -> Self {
354        let global_options = if let Some(ref options) = factory.options {
355            options.clone()
356        } else {
357            return TableParquetOptionsProto::default();
358        };
359
360        let column_specific_options = global_options.column_specific_options;
361        #[allow(deprecated)] // max_statistics_size
362        TableParquetOptionsProto {
363            global: Some(ParquetOptionsProto {
364                enable_page_index: global_options.global.enable_page_index,
365                pruning: global_options.global.pruning,
366                skip_metadata: global_options.global.skip_metadata,
367                metadata_size_hint_opt: global_options.global.metadata_size_hint.map(|size| {
368                    parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size as u64)
369                }),
370                pushdown_filters: global_options.global.pushdown_filters,
371                reorder_filters: global_options.global.reorder_filters,
372                data_pagesize_limit: global_options.global.data_pagesize_limit as u64,
373                write_batch_size: global_options.global.write_batch_size as u64,
374                writer_version: global_options.global.writer_version.clone(),
375                compression_opt: global_options.global.compression.map(|compression| {
376                    parquet_options::CompressionOpt::Compression(compression)
377                }),
378                dictionary_enabled_opt: global_options.global.dictionary_enabled.map(|enabled| {
379                    parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
380                }),
381                dictionary_page_size_limit: global_options.global.dictionary_page_size_limit as u64,
382                statistics_enabled_opt: global_options.global.statistics_enabled.map(|enabled| {
383                    parquet_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
384                }),
385                max_statistics_size_opt: global_options.global.max_statistics_size.map(|size| {
386                    parquet_options::MaxStatisticsSizeOpt::MaxStatisticsSize(size as u64)
387                }),
388                max_row_group_size: global_options.global.max_row_group_size as u64,
389                created_by: global_options.global.created_by.clone(),
390                column_index_truncate_length_opt: global_options.global.column_index_truncate_length.map(|length| {
391                    parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length as u64)
392                }),
393                statistics_truncate_length_opt: global_options.global.statistics_truncate_length.map(|length| {
394                    parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length as u64)
395                }),
396                data_page_row_count_limit: global_options.global.data_page_row_count_limit as u64,
397                encoding_opt: global_options.global.encoding.map(|encoding| {
398                    parquet_options::EncodingOpt::Encoding(encoding)
399                }),
400                bloom_filter_on_read: global_options.global.bloom_filter_on_read,
401                bloom_filter_on_write: global_options.global.bloom_filter_on_write,
402                bloom_filter_fpp_opt: global_options.global.bloom_filter_fpp.map(|fpp| {
403                    parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
404                }),
405                bloom_filter_ndv_opt: global_options.global.bloom_filter_ndv.map(|ndv| {
406                    parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
407                }),
408                allow_single_file_parallelism: global_options.global.allow_single_file_parallelism,
409                maximum_parallel_row_group_writers: global_options.global.maximum_parallel_row_group_writers as u64,
410                maximum_buffered_record_batches_per_stream: global_options.global.maximum_buffered_record_batches_per_stream as u64,
411                schema_force_view_types: global_options.global.schema_force_view_types,
412                binary_as_string: global_options.global.binary_as_string,
413                skip_arrow_metadata: global_options.global.skip_arrow_metadata,
414                coerce_int96_opt: global_options.global.coerce_int96.map(|compression| {
415                    parquet_options::CoerceInt96Opt::CoerceInt96(compression)
416                }),
417            }),
418            column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| {
419                ParquetColumnSpecificOptions {
420                    column_name,
421                    options: Some(ParquetColumnOptionsProto {
422                        bloom_filter_enabled_opt: options.bloom_filter_enabled.map(|enabled| {
423                            parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(enabled)
424                        }),
425                        encoding_opt: options.encoding.map(|encoding| {
426                            parquet_column_options::EncodingOpt::Encoding(encoding)
427                        }),
428                        dictionary_enabled_opt: options.dictionary_enabled.map(|enabled| {
429                            parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
430                        }),
431                        compression_opt: options.compression.map(|compression| {
432                            parquet_column_options::CompressionOpt::Compression(compression)
433                        }),
434                        statistics_enabled_opt: options.statistics_enabled.map(|enabled| {
435                            parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
436                        }),
437                        bloom_filter_fpp_opt: options.bloom_filter_fpp.map(|fpp| {
438                            parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
439                        }),
440                        bloom_filter_ndv_opt: options.bloom_filter_ndv.map(|ndv| {
441                            parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
442                        }),
443                        max_statistics_size_opt: options.max_statistics_size.map(|size| {
444                            parquet_column_options::MaxStatisticsSizeOpt::MaxStatisticsSize(size as u32)
445                        }),
446                    })
447                }
448            }).collect(),
449            key_value_metadata: global_options.key_value_metadata
450                .iter()
451                .filter_map(|(key, value)| {
452                    value.as_ref().map(|v| (key.clone(), v.clone()))
453                })
454                .collect(),
455        }
456    }
457}
458
459impl From<&ParquetOptionsProto> for ParquetOptions {
460    fn from(proto: &ParquetOptionsProto) -> Self {
461        #[allow(deprecated)] // max_statistics_size
462        ParquetOptions {
463            enable_page_index: proto.enable_page_index,
464            pruning: proto.pruning,
465            skip_metadata: proto.skip_metadata,
466            metadata_size_hint: proto.metadata_size_hint_opt.as_ref().map(|opt| match opt {
467                parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size) => *size as usize,
468            }),
469            pushdown_filters: proto.pushdown_filters,
470            reorder_filters: proto.reorder_filters,
471            data_pagesize_limit: proto.data_pagesize_limit as usize,
472            write_batch_size: proto.write_batch_size as usize,
473            writer_version: proto.writer_version.clone(),
474            compression: proto.compression_opt.as_ref().map(|opt| match opt {
475                parquet_options::CompressionOpt::Compression(compression) => compression.clone(),
476            }),
477            dictionary_enabled: proto.dictionary_enabled_opt.as_ref().map(|opt| match opt {
478                parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled) => *enabled,
479            }),
480            dictionary_page_size_limit: proto.dictionary_page_size_limit as usize,
481            statistics_enabled: proto.statistics_enabled_opt.as_ref().map(|opt| match opt {
482                parquet_options::StatisticsEnabledOpt::StatisticsEnabled(statistics) => statistics.clone(),
483            }),
484            max_statistics_size: proto.max_statistics_size_opt.as_ref().map(|opt| match opt {
485                parquet_options::MaxStatisticsSizeOpt::MaxStatisticsSize(size) => *size as usize,
486            }),
487            max_row_group_size: proto.max_row_group_size as usize,
488            created_by: proto.created_by.clone(),
489            column_index_truncate_length: proto.column_index_truncate_length_opt.as_ref().map(|opt| match opt {
490                parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length) => *length as usize,
491            }),
492            statistics_truncate_length: proto.statistics_truncate_length_opt.as_ref().map(|opt| match opt {
493                parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length) => *length as usize,
494            }),
495            data_page_row_count_limit: proto.data_page_row_count_limit as usize,
496            encoding: proto.encoding_opt.as_ref().map(|opt| match opt {
497                parquet_options::EncodingOpt::Encoding(encoding) => encoding.clone(),
498            }),
499            bloom_filter_on_read: proto.bloom_filter_on_read,
500            bloom_filter_on_write: proto.bloom_filter_on_write,
501            bloom_filter_fpp: proto.bloom_filter_fpp_opt.as_ref().map(|opt| match opt {
502                parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp) => *fpp,
503            }),
504            bloom_filter_ndv: proto.bloom_filter_ndv_opt.as_ref().map(|opt| match opt {
505                parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv) => *ndv,
506            }),
507            allow_single_file_parallelism: proto.allow_single_file_parallelism,
508            maximum_parallel_row_group_writers: proto.maximum_parallel_row_group_writers as usize,
509            maximum_buffered_record_batches_per_stream: proto.maximum_buffered_record_batches_per_stream as usize,
510            schema_force_view_types: proto.schema_force_view_types,
511            binary_as_string: proto.binary_as_string,
512            skip_arrow_metadata: proto.skip_arrow_metadata,
513            coerce_int96: proto.coerce_int96_opt.as_ref().map(|opt| match opt {
514                parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => coerce_int96.clone(),
515            }),
516        }
517    }
518}
519
520impl From<ParquetColumnOptionsProto> for ParquetColumnOptions {
521    fn from(proto: ParquetColumnOptionsProto) -> Self {
522        #[allow(deprecated)] // max_statistics_size
523        ParquetColumnOptions {
524            bloom_filter_enabled: proto.bloom_filter_enabled_opt.map(
525                |parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v)| v,
526            ),
527            encoding: proto
528                .encoding_opt
529                .map(|parquet_column_options::EncodingOpt::Encoding(v)| v),
530            dictionary_enabled: proto.dictionary_enabled_opt.map(
531                |parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(v)| v,
532            ),
533            compression: proto
534                .compression_opt
535                .map(|parquet_column_options::CompressionOpt::Compression(v)| v),
536            statistics_enabled: proto.statistics_enabled_opt.map(
537                |parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(v)| v,
538            ),
539            bloom_filter_fpp: proto
540                .bloom_filter_fpp_opt
541                .map(|parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v)| v),
542            bloom_filter_ndv: proto
543                .bloom_filter_ndv_opt
544                .map(|parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v)| v),
545            max_statistics_size: proto.max_statistics_size_opt.map(
546                |parquet_column_options::MaxStatisticsSizeOpt::MaxStatisticsSize(v)| {
547                    v as usize
548                },
549            ),
550        }
551    }
552}
553
554impl From<&TableParquetOptionsProto> for TableParquetOptions {
555    fn from(proto: &TableParquetOptionsProto) -> Self {
556        TableParquetOptions {
557            global: proto
558                .global
559                .as_ref()
560                .map(ParquetOptions::from)
561                .unwrap_or_default(),
562            column_specific_options: proto
563                .column_specific_options
564                .iter()
565                .map(|parquet_column_options| {
566                    (
567                        parquet_column_options.column_name.clone(),
568                        ParquetColumnOptions::from(
569                            parquet_column_options.options.clone().unwrap_or_default(),
570                        ),
571                    )
572                })
573                .collect(),
574            key_value_metadata: proto
575                .key_value_metadata
576                .iter()
577                .map(|(k, v)| (k.clone(), Some(v.clone())))
578                .collect(),
579        }
580    }
581}
582
583#[derive(Debug)]
584pub struct ParquetLogicalExtensionCodec;
585
586// TODO! This is a placeholder for now and needs to be implemented for real.
587impl LogicalExtensionCodec for ParquetLogicalExtensionCodec {
588    fn try_decode(
589        &self,
590        _buf: &[u8],
591        _inputs: &[datafusion_expr::LogicalPlan],
592        _ctx: &SessionContext,
593    ) -> datafusion_common::Result<datafusion_expr::Extension> {
594        not_impl_err!("Method not implemented")
595    }
596
597    fn try_encode(
598        &self,
599        _node: &datafusion_expr::Extension,
600        _buf: &mut Vec<u8>,
601    ) -> datafusion_common::Result<()> {
602        not_impl_err!("Method not implemented")
603    }
604
605    fn try_decode_table_provider(
606        &self,
607        _buf: &[u8],
608        _table_ref: &TableReference,
609        _schema: arrow::datatypes::SchemaRef,
610        _ctx: &SessionContext,
611    ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
612        not_impl_err!("Method not implemented")
613    }
614
615    fn try_encode_table_provider(
616        &self,
617        _table_ref: &TableReference,
618        _node: Arc<dyn datafusion::datasource::TableProvider>,
619        _buf: &mut Vec<u8>,
620    ) -> datafusion_common::Result<()> {
621        not_impl_err!("Method not implemented")
622    }
623
624    fn try_decode_file_format(
625        &self,
626        buf: &[u8],
627        _ctx: &SessionContext,
628    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
629        let proto = TableParquetOptionsProto::decode(buf).map_err(|e| {
630            DataFusionError::Execution(format!(
631                "Failed to decode TableParquetOptionsProto: {e:?}"
632            ))
633        })?;
634        let options: TableParquetOptions = (&proto).into();
635        Ok(Arc::new(ParquetFormatFactory {
636            options: Some(options),
637        }))
638    }
639
640    fn try_encode_file_format(
641        &self,
642        buf: &mut Vec<u8>,
643        node: Arc<dyn FileFormatFactory>,
644    ) -> datafusion_common::Result<()> {
645        let options = if let Some(parquet_factory) =
646            node.as_any().downcast_ref::<ParquetFormatFactory>()
647        {
648            parquet_factory.options.clone().unwrap_or_default()
649        } else {
650            return Err(DataFusionError::Execution(
651                "Unsupported FileFormatFactory type".to_string(),
652            ));
653        };
654
655        let proto = TableParquetOptionsProto::from_factory(&ParquetFormatFactory {
656            options: Some(options),
657        });
658
659        proto.encode(buf).map_err(|e| {
660            DataFusionError::Execution(format!(
661                "Failed to encode TableParquetOptionsProto: {e:?}"
662            ))
663        })?;
664
665        Ok(())
666    }
667}
668
669#[derive(Debug)]
670pub struct ArrowLogicalExtensionCodec;
671
672// TODO! This is a placeholder for now and needs to be implemented for real.
673impl LogicalExtensionCodec for ArrowLogicalExtensionCodec {
674    fn try_decode(
675        &self,
676        _buf: &[u8],
677        _inputs: &[datafusion_expr::LogicalPlan],
678        _ctx: &SessionContext,
679    ) -> datafusion_common::Result<datafusion_expr::Extension> {
680        not_impl_err!("Method not implemented")
681    }
682
683    fn try_encode(
684        &self,
685        _node: &datafusion_expr::Extension,
686        _buf: &mut Vec<u8>,
687    ) -> datafusion_common::Result<()> {
688        not_impl_err!("Method not implemented")
689    }
690
691    fn try_decode_table_provider(
692        &self,
693        _buf: &[u8],
694        _table_ref: &TableReference,
695        _schema: arrow::datatypes::SchemaRef,
696        _ctx: &SessionContext,
697    ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
698        not_impl_err!("Method not implemented")
699    }
700
701    fn try_encode_table_provider(
702        &self,
703        _table_ref: &TableReference,
704        _node: Arc<dyn datafusion::datasource::TableProvider>,
705        _buf: &mut Vec<u8>,
706    ) -> datafusion_common::Result<()> {
707        not_impl_err!("Method not implemented")
708    }
709
710    fn try_decode_file_format(
711        &self,
712        __buf: &[u8],
713        __ctx: &SessionContext,
714    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
715        Ok(Arc::new(ArrowFormatFactory::new()))
716    }
717
718    fn try_encode_file_format(
719        &self,
720        __buf: &mut Vec<u8>,
721        __node: Arc<dyn FileFormatFactory>,
722    ) -> datafusion_common::Result<()> {
723        Ok(())
724    }
725}
726
727#[derive(Debug)]
728pub struct AvroLogicalExtensionCodec;
729
730// TODO! This is a placeholder for now and needs to be implemented for real.
731impl LogicalExtensionCodec for AvroLogicalExtensionCodec {
732    fn try_decode(
733        &self,
734        _buf: &[u8],
735        _inputs: &[datafusion_expr::LogicalPlan],
736        _ctx: &SessionContext,
737    ) -> datafusion_common::Result<datafusion_expr::Extension> {
738        not_impl_err!("Method not implemented")
739    }
740
741    fn try_encode(
742        &self,
743        _node: &datafusion_expr::Extension,
744        _buf: &mut Vec<u8>,
745    ) -> datafusion_common::Result<()> {
746        not_impl_err!("Method not implemented")
747    }
748
749    fn try_decode_table_provider(
750        &self,
751        _buf: &[u8],
752        _table_ref: &TableReference,
753        _schema: arrow::datatypes::SchemaRef,
754        _cts: &SessionContext,
755    ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
756        not_impl_err!("Method not implemented")
757    }
758
759    fn try_encode_table_provider(
760        &self,
761        _table_ref: &TableReference,
762        _node: Arc<dyn datafusion::datasource::TableProvider>,
763        _buf: &mut Vec<u8>,
764    ) -> datafusion_common::Result<()> {
765        not_impl_err!("Method not implemented")
766    }
767
768    fn try_decode_file_format(
769        &self,
770        __buf: &[u8],
771        __ctx: &SessionContext,
772    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
773        Ok(Arc::new(ArrowFormatFactory::new()))
774    }
775
776    fn try_encode_file_format(
777        &self,
778        __buf: &mut Vec<u8>,
779        __node: Arc<dyn FileFormatFactory>,
780    ) -> datafusion_common::Result<()> {
781        Ok(())
782    }
783}