datafusion_proto/logical_plan/
file_formats.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use crate::protobuf::{CsvOptions as CsvOptionsProto, JsonOptions as JsonOptionsProto};
21use datafusion_common::config::{CsvOptions, JsonOptions};
22use datafusion_common::{
23    exec_datafusion_err, exec_err, not_impl_err, parsers::CompressionTypeVariant,
24    TableReference,
25};
26use datafusion_datasource::file_format::FileFormatFactory;
27use datafusion_datasource_arrow::file_format::ArrowFormatFactory;
28use datafusion_datasource_csv::file_format::CsvFormatFactory;
29use datafusion_datasource_json::file_format::JsonFormatFactory;
30use datafusion_execution::TaskContext;
31use prost::Message;
32
33use super::LogicalExtensionCodec;
34
35#[derive(Debug)]
36pub struct CsvLogicalExtensionCodec;
37
38impl CsvOptionsProto {
39    fn from_factory(factory: &CsvFormatFactory) -> Self {
40        if let Some(options) = &factory.options {
41            CsvOptionsProto {
42                has_header: options.has_header.map_or(vec![], |v| vec![v as u8]),
43                delimiter: vec![options.delimiter],
44                quote: vec![options.quote],
45                terminator: options.terminator.map_or(vec![], |v| vec![v]),
46                escape: options.escape.map_or(vec![], |v| vec![v]),
47                double_quote: options.double_quote.map_or(vec![], |v| vec![v as u8]),
48                compression: options.compression as i32,
49                schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
50                date_format: options.date_format.clone().unwrap_or_default(),
51                datetime_format: options.datetime_format.clone().unwrap_or_default(),
52                timestamp_format: options.timestamp_format.clone().unwrap_or_default(),
53                timestamp_tz_format: options
54                    .timestamp_tz_format
55                    .clone()
56                    .unwrap_or_default(),
57                time_format: options.time_format.clone().unwrap_or_default(),
58                null_value: options.null_value.clone().unwrap_or_default(),
59                null_regex: options.null_regex.clone().unwrap_or_default(),
60                comment: options.comment.map_or(vec![], |v| vec![v]),
61                newlines_in_values: options
62                    .newlines_in_values
63                    .map_or(vec![], |v| vec![v as u8]),
64                truncated_rows: options.truncated_rows.map_or(vec![], |v| vec![v as u8]),
65            }
66        } else {
67            CsvOptionsProto::default()
68        }
69    }
70}
71
72impl From<&CsvOptionsProto> for CsvOptions {
73    fn from(proto: &CsvOptionsProto) -> Self {
74        CsvOptions {
75            has_header: if !proto.has_header.is_empty() {
76                Some(proto.has_header[0] != 0)
77            } else {
78                None
79            },
80            delimiter: proto.delimiter.first().copied().unwrap_or(b','),
81            quote: proto.quote.first().copied().unwrap_or(b'"'),
82            terminator: if !proto.terminator.is_empty() {
83                Some(proto.terminator[0])
84            } else {
85                None
86            },
87            escape: if !proto.escape.is_empty() {
88                Some(proto.escape[0])
89            } else {
90                None
91            },
92            double_quote: if !proto.double_quote.is_empty() {
93                Some(proto.double_quote[0] != 0)
94            } else {
95                None
96            },
97            compression: match proto.compression {
98                0 => CompressionTypeVariant::GZIP,
99                1 => CompressionTypeVariant::BZIP2,
100                2 => CompressionTypeVariant::XZ,
101                3 => CompressionTypeVariant::ZSTD,
102                _ => CompressionTypeVariant::UNCOMPRESSED,
103            },
104            schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
105            date_format: if proto.date_format.is_empty() {
106                None
107            } else {
108                Some(proto.date_format.clone())
109            },
110            datetime_format: if proto.datetime_format.is_empty() {
111                None
112            } else {
113                Some(proto.datetime_format.clone())
114            },
115            timestamp_format: if proto.timestamp_format.is_empty() {
116                None
117            } else {
118                Some(proto.timestamp_format.clone())
119            },
120            timestamp_tz_format: if proto.timestamp_tz_format.is_empty() {
121                None
122            } else {
123                Some(proto.timestamp_tz_format.clone())
124            },
125            time_format: if proto.time_format.is_empty() {
126                None
127            } else {
128                Some(proto.time_format.clone())
129            },
130            null_value: if proto.null_value.is_empty() {
131                None
132            } else {
133                Some(proto.null_value.clone())
134            },
135            null_regex: if proto.null_regex.is_empty() {
136                None
137            } else {
138                Some(proto.null_regex.clone())
139            },
140            comment: if !proto.comment.is_empty() {
141                Some(proto.comment[0])
142            } else {
143                None
144            },
145            newlines_in_values: if proto.newlines_in_values.is_empty() {
146                None
147            } else {
148                Some(proto.newlines_in_values[0] != 0)
149            },
150            truncated_rows: if proto.truncated_rows.is_empty() {
151                None
152            } else {
153                Some(proto.truncated_rows[0] != 0)
154            },
155        }
156    }
157}
158
159// TODO! This is a placeholder for now and needs to be implemented for real.
160impl LogicalExtensionCodec for CsvLogicalExtensionCodec {
161    fn try_decode(
162        &self,
163        _buf: &[u8],
164        _inputs: &[datafusion_expr::LogicalPlan],
165        _ctx: &TaskContext,
166    ) -> datafusion_common::Result<datafusion_expr::Extension> {
167        not_impl_err!("Method not implemented")
168    }
169
170    fn try_encode(
171        &self,
172        _node: &datafusion_expr::Extension,
173        _buf: &mut Vec<u8>,
174    ) -> datafusion_common::Result<()> {
175        not_impl_err!("Method not implemented")
176    }
177
178    fn try_decode_table_provider(
179        &self,
180        _buf: &[u8],
181        _table_ref: &TableReference,
182        _schema: arrow::datatypes::SchemaRef,
183        _ctx: &TaskContext,
184    ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>> {
185        not_impl_err!("Method not implemented")
186    }
187
188    fn try_encode_table_provider(
189        &self,
190        _table_ref: &TableReference,
191        _node: Arc<dyn datafusion_catalog::TableProvider>,
192        _buf: &mut Vec<u8>,
193    ) -> datafusion_common::Result<()> {
194        not_impl_err!("Method not implemented")
195    }
196
197    fn try_decode_file_format(
198        &self,
199        buf: &[u8],
200        _ctx: &TaskContext,
201    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
202        let proto = CsvOptionsProto::decode(buf).map_err(|e| {
203            exec_datafusion_err!("Failed to decode CsvOptionsProto: {e:?}")
204        })?;
205        let options: CsvOptions = (&proto).into();
206        Ok(Arc::new(CsvFormatFactory {
207            options: Some(options),
208        }))
209    }
210
211    fn try_encode_file_format(
212        &self,
213        buf: &mut Vec<u8>,
214        node: Arc<dyn FileFormatFactory>,
215    ) -> datafusion_common::Result<()> {
216        let options =
217            if let Some(csv_factory) = node.as_any().downcast_ref::<CsvFormatFactory>() {
218                csv_factory.options.clone().unwrap_or_default()
219            } else {
220                return exec_err!("{}", "Unsupported FileFormatFactory type".to_string());
221            };
222
223        let proto = CsvOptionsProto::from_factory(&CsvFormatFactory {
224            options: Some(options),
225        });
226
227        proto
228            .encode(buf)
229            .map_err(|e| exec_datafusion_err!("Failed to encode CsvOptions: {e:?}"))?;
230
231        Ok(())
232    }
233}
234
235impl JsonOptionsProto {
236    fn from_factory(factory: &JsonFormatFactory) -> Self {
237        if let Some(options) = &factory.options {
238            JsonOptionsProto {
239                compression: options.compression as i32,
240                schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
241            }
242        } else {
243            JsonOptionsProto::default()
244        }
245    }
246}
247
248impl From<&JsonOptionsProto> for JsonOptions {
249    fn from(proto: &JsonOptionsProto) -> Self {
250        JsonOptions {
251            compression: match proto.compression {
252                0 => CompressionTypeVariant::GZIP,
253                1 => CompressionTypeVariant::BZIP2,
254                2 => CompressionTypeVariant::XZ,
255                3 => CompressionTypeVariant::ZSTD,
256                _ => CompressionTypeVariant::UNCOMPRESSED,
257            },
258            schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
259        }
260    }
261}
262
263#[derive(Debug)]
264pub struct JsonLogicalExtensionCodec;
265
266// TODO! This is a placeholder for now and needs to be implemented for real.
267impl LogicalExtensionCodec for JsonLogicalExtensionCodec {
268    fn try_decode(
269        &self,
270        _buf: &[u8],
271        _inputs: &[datafusion_expr::LogicalPlan],
272        _ctx: &TaskContext,
273    ) -> datafusion_common::Result<datafusion_expr::Extension> {
274        not_impl_err!("Method not implemented")
275    }
276
277    fn try_encode(
278        &self,
279        _node: &datafusion_expr::Extension,
280        _buf: &mut Vec<u8>,
281    ) -> datafusion_common::Result<()> {
282        not_impl_err!("Method not implemented")
283    }
284
285    fn try_decode_table_provider(
286        &self,
287        _buf: &[u8],
288        _table_ref: &TableReference,
289        _schema: arrow::datatypes::SchemaRef,
290        _ctx: &TaskContext,
291    ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>> {
292        not_impl_err!("Method not implemented")
293    }
294
295    fn try_encode_table_provider(
296        &self,
297        _table_ref: &TableReference,
298        _node: Arc<dyn datafusion_catalog::TableProvider>,
299        _buf: &mut Vec<u8>,
300    ) -> datafusion_common::Result<()> {
301        not_impl_err!("Method not implemented")
302    }
303
304    fn try_decode_file_format(
305        &self,
306        buf: &[u8],
307        _ctx: &TaskContext,
308    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
309        let proto = JsonOptionsProto::decode(buf).map_err(|e| {
310            exec_datafusion_err!("Failed to decode JsonOptionsProto: {e:?}")
311        })?;
312        let options: JsonOptions = (&proto).into();
313        Ok(Arc::new(JsonFormatFactory {
314            options: Some(options),
315        }))
316    }
317
318    fn try_encode_file_format(
319        &self,
320        buf: &mut Vec<u8>,
321        node: Arc<dyn FileFormatFactory>,
322    ) -> datafusion_common::Result<()> {
323        let options = if let Some(json_factory) =
324            node.as_any().downcast_ref::<JsonFormatFactory>()
325        {
326            json_factory.options.clone().unwrap_or_default()
327        } else {
328            return exec_err!("Unsupported FileFormatFactory type");
329        };
330
331        let proto = JsonOptionsProto::from_factory(&JsonFormatFactory {
332            options: Some(options),
333        });
334
335        proto
336            .encode(buf)
337            .map_err(|e| exec_datafusion_err!("Failed to encode JsonOptions: {e:?}"))?;
338
339        Ok(())
340    }
341}
342
343#[cfg(feature = "parquet")]
344mod parquet {
345    use super::*;
346
347    use crate::protobuf::{
348        parquet_column_options, parquet_options,
349        ParquetColumnOptions as ParquetColumnOptionsProto, ParquetColumnSpecificOptions,
350        ParquetOptions as ParquetOptionsProto,
351        TableParquetOptions as TableParquetOptionsProto,
352    };
353    use datafusion_common::config::{
354        ParquetColumnOptions, ParquetOptions, TableParquetOptions,
355    };
356    use datafusion_datasource_parquet::file_format::ParquetFormatFactory;
357
358    impl TableParquetOptionsProto {
359        fn from_factory(factory: &ParquetFormatFactory) -> Self {
360            let global_options = if let Some(ref options) = factory.options {
361                options.clone()
362            } else {
363                return TableParquetOptionsProto::default();
364            };
365
366            let column_specific_options = global_options.column_specific_options;
367            #[allow(deprecated)] // max_statistics_size
368        TableParquetOptionsProto {
369            global: Some(ParquetOptionsProto {
370                enable_page_index: global_options.global.enable_page_index,
371                pruning: global_options.global.pruning,
372                skip_metadata: global_options.global.skip_metadata,
373                metadata_size_hint_opt: global_options.global.metadata_size_hint.map(|size| {
374                    parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size as u64)
375                }),
376                pushdown_filters: global_options.global.pushdown_filters,
377                reorder_filters: global_options.global.reorder_filters,
378                data_pagesize_limit: global_options.global.data_pagesize_limit as u64,
379                write_batch_size: global_options.global.write_batch_size as u64,
380                writer_version: global_options.global.writer_version.clone(),
381                compression_opt: global_options.global.compression.map(|compression| {
382                    parquet_options::CompressionOpt::Compression(compression)
383                }),
384                dictionary_enabled_opt: global_options.global.dictionary_enabled.map(|enabled| {
385                    parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
386                }),
387                dictionary_page_size_limit: global_options.global.dictionary_page_size_limit as u64,
388                statistics_enabled_opt: global_options.global.statistics_enabled.map(|enabled| {
389                    parquet_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
390                }),
391                max_row_group_size: global_options.global.max_row_group_size as u64,
392                created_by: global_options.global.created_by.clone(),
393                column_index_truncate_length_opt: global_options.global.column_index_truncate_length.map(|length| {
394                    parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length as u64)
395                }),
396                statistics_truncate_length_opt: global_options.global.statistics_truncate_length.map(|length| {
397                    parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length as u64)
398                }),
399                data_page_row_count_limit: global_options.global.data_page_row_count_limit as u64,
400                encoding_opt: global_options.global.encoding.map(|encoding| {
401                    parquet_options::EncodingOpt::Encoding(encoding)
402                }),
403                bloom_filter_on_read: global_options.global.bloom_filter_on_read,
404                bloom_filter_on_write: global_options.global.bloom_filter_on_write,
405                bloom_filter_fpp_opt: global_options.global.bloom_filter_fpp.map(|fpp| {
406                    parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
407                }),
408                bloom_filter_ndv_opt: global_options.global.bloom_filter_ndv.map(|ndv| {
409                    parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
410                }),
411                allow_single_file_parallelism: global_options.global.allow_single_file_parallelism,
412                maximum_parallel_row_group_writers: global_options.global.maximum_parallel_row_group_writers as u64,
413                maximum_buffered_record_batches_per_stream: global_options.global.maximum_buffered_record_batches_per_stream as u64,
414                schema_force_view_types: global_options.global.schema_force_view_types,
415                binary_as_string: global_options.global.binary_as_string,
416                skip_arrow_metadata: global_options.global.skip_arrow_metadata,
417                coerce_int96_opt: global_options.global.coerce_int96.map(|compression| {
418                    parquet_options::CoerceInt96Opt::CoerceInt96(compression)
419                }),
420                max_predicate_cache_size_opt: global_options.global.max_predicate_cache_size.map(|size| {
421                    parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size as u64)
422                }),
423            }),
424            column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| {
425                ParquetColumnSpecificOptions {
426                    column_name,
427                    options: Some(ParquetColumnOptionsProto {
428                        bloom_filter_enabled_opt: options.bloom_filter_enabled.map(|enabled| {
429                            parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(enabled)
430                        }),
431                        encoding_opt: options.encoding.map(|encoding| {
432                            parquet_column_options::EncodingOpt::Encoding(encoding)
433                        }),
434                        dictionary_enabled_opt: options.dictionary_enabled.map(|enabled| {
435                            parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
436                        }),
437                        compression_opt: options.compression.map(|compression| {
438                            parquet_column_options::CompressionOpt::Compression(compression)
439                        }),
440                        statistics_enabled_opt: options.statistics_enabled.map(|enabled| {
441                            parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
442                        }),
443                        bloom_filter_fpp_opt: options.bloom_filter_fpp.map(|fpp| {
444                            parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
445                        }),
446                        bloom_filter_ndv_opt: options.bloom_filter_ndv.map(|ndv| {
447                            parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
448                        }),
449                    })
450                }
451            }).collect(),
452            key_value_metadata: global_options.key_value_metadata
453                .iter()
454                .filter_map(|(key, value)| {
455                    value.as_ref().map(|v| (key.clone(), v.clone()))
456                })
457                .collect(),
458        }
459        }
460    }
461
462    impl From<&ParquetOptionsProto> for ParquetOptions {
463        fn from(proto: &ParquetOptionsProto) -> Self {
464            #[allow(deprecated)] // max_statistics_size
465        ParquetOptions {
466            enable_page_index: proto.enable_page_index,
467            pruning: proto.pruning,
468            skip_metadata: proto.skip_metadata,
469            metadata_size_hint: proto.metadata_size_hint_opt.as_ref().map(|opt| match opt {
470                parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size) => *size as usize,
471            }),
472            pushdown_filters: proto.pushdown_filters,
473            reorder_filters: proto.reorder_filters,
474            data_pagesize_limit: proto.data_pagesize_limit as usize,
475            write_batch_size: proto.write_batch_size as usize,
476            writer_version: proto.writer_version.clone(),
477            compression: proto.compression_opt.as_ref().map(|opt| match opt {
478                parquet_options::CompressionOpt::Compression(compression) => compression.clone(),
479            }),
480            dictionary_enabled: proto.dictionary_enabled_opt.as_ref().map(|opt| match opt {
481                parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled) => *enabled,
482            }),
483            dictionary_page_size_limit: proto.dictionary_page_size_limit as usize,
484            statistics_enabled: proto.statistics_enabled_opt.as_ref().map(|opt| match opt {
485                parquet_options::StatisticsEnabledOpt::StatisticsEnabled(statistics) => statistics.clone(),
486            }),
487            max_row_group_size: proto.max_row_group_size as usize,
488            created_by: proto.created_by.clone(),
489            column_index_truncate_length: proto.column_index_truncate_length_opt.as_ref().map(|opt| match opt {
490                parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length) => *length as usize,
491            }),
492            statistics_truncate_length: proto.statistics_truncate_length_opt.as_ref().map(|opt| match opt {
493                parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length) => *length as usize,
494            }),
495            data_page_row_count_limit: proto.data_page_row_count_limit as usize,
496            encoding: proto.encoding_opt.as_ref().map(|opt| match opt {
497                parquet_options::EncodingOpt::Encoding(encoding) => encoding.clone(),
498            }),
499            bloom_filter_on_read: proto.bloom_filter_on_read,
500            bloom_filter_on_write: proto.bloom_filter_on_write,
501            bloom_filter_fpp: proto.bloom_filter_fpp_opt.as_ref().map(|opt| match opt {
502                parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp) => *fpp,
503            }),
504            bloom_filter_ndv: proto.bloom_filter_ndv_opt.as_ref().map(|opt| match opt {
505                parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv) => *ndv,
506            }),
507            allow_single_file_parallelism: proto.allow_single_file_parallelism,
508            maximum_parallel_row_group_writers: proto.maximum_parallel_row_group_writers as usize,
509            maximum_buffered_record_batches_per_stream: proto.maximum_buffered_record_batches_per_stream as usize,
510            schema_force_view_types: proto.schema_force_view_types,
511            binary_as_string: proto.binary_as_string,
512            skip_arrow_metadata: proto.skip_arrow_metadata,
513            coerce_int96: proto.coerce_int96_opt.as_ref().map(|opt| match opt {
514                parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => coerce_int96.clone(),
515            }),
516            max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt {
517                parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize,
518            }),
519        }
520        }
521    }
522
523    impl From<ParquetColumnOptionsProto> for ParquetColumnOptions {
524        fn from(proto: ParquetColumnOptionsProto) -> Self {
525            #[allow(deprecated)] // max_statistics_size
526        ParquetColumnOptions {
527            bloom_filter_enabled: proto.bloom_filter_enabled_opt.map(
528                |parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v)| v,
529            ),
530            encoding: proto
531                .encoding_opt
532                .map(|parquet_column_options::EncodingOpt::Encoding(v)| v),
533            dictionary_enabled: proto.dictionary_enabled_opt.map(
534                |parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(v)| v,
535            ),
536            compression: proto
537                .compression_opt
538                .map(|parquet_column_options::CompressionOpt::Compression(v)| v),
539            statistics_enabled: proto.statistics_enabled_opt.map(
540                |parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(v)| v,
541            ),
542            bloom_filter_fpp: proto
543                .bloom_filter_fpp_opt
544                .map(|parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v)| v),
545            bloom_filter_ndv: proto
546                .bloom_filter_ndv_opt
547                .map(|parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v)| v),
548        }
549        }
550    }
551
552    impl From<&TableParquetOptionsProto> for TableParquetOptions {
553        fn from(proto: &TableParquetOptionsProto) -> Self {
554            TableParquetOptions {
555                global: proto
556                    .global
557                    .as_ref()
558                    .map(ParquetOptions::from)
559                    .unwrap_or_default(),
560                column_specific_options: proto
561                    .column_specific_options
562                    .iter()
563                    .map(|parquet_column_options| {
564                        (
565                            parquet_column_options.column_name.clone(),
566                            ParquetColumnOptions::from(
567                                parquet_column_options
568                                    .options
569                                    .clone()
570                                    .unwrap_or_default(),
571                            ),
572                        )
573                    })
574                    .collect(),
575                key_value_metadata: proto
576                    .key_value_metadata
577                    .iter()
578                    .map(|(k, v)| (k.clone(), Some(v.clone())))
579                    .collect(),
580                crypto: Default::default(),
581            }
582        }
583    }
584
585    #[derive(Debug)]
586    pub struct ParquetLogicalExtensionCodec;
587
588    // TODO! This is a placeholder for now and needs to be implemented for real.
589    impl LogicalExtensionCodec for ParquetLogicalExtensionCodec {
590        fn try_decode(
591            &self,
592            _buf: &[u8],
593            _inputs: &[datafusion_expr::LogicalPlan],
594            _ctx: &TaskContext,
595        ) -> datafusion_common::Result<datafusion_expr::Extension> {
596            not_impl_err!("Method not implemented")
597        }
598
599        fn try_encode(
600            &self,
601            _node: &datafusion_expr::Extension,
602            _buf: &mut Vec<u8>,
603        ) -> datafusion_common::Result<()> {
604            not_impl_err!("Method not implemented")
605        }
606
607        fn try_decode_table_provider(
608            &self,
609            _buf: &[u8],
610            _table_ref: &TableReference,
611            _schema: arrow::datatypes::SchemaRef,
612            _ctx: &TaskContext,
613        ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>>
614        {
615            not_impl_err!("Method not implemented")
616        }
617
618        fn try_encode_table_provider(
619            &self,
620            _table_ref: &TableReference,
621            _node: Arc<dyn datafusion_catalog::TableProvider>,
622            _buf: &mut Vec<u8>,
623        ) -> datafusion_common::Result<()> {
624            not_impl_err!("Method not implemented")
625        }
626
627        fn try_decode_file_format(
628            &self,
629            buf: &[u8],
630            _ctx: &TaskContext,
631        ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
632            let proto = TableParquetOptionsProto::decode(buf).map_err(|e| {
633                exec_datafusion_err!("Failed to decode TableParquetOptionsProto: {e:?}")
634            })?;
635            let options: TableParquetOptions = (&proto).into();
636            Ok(Arc::new(
637                datafusion_datasource_parquet::file_format::ParquetFormatFactory {
638                    options: Some(options),
639                },
640            ))
641        }
642
643        fn try_encode_file_format(
644            &self,
645            buf: &mut Vec<u8>,
646            node: Arc<dyn FileFormatFactory>,
647        ) -> datafusion_common::Result<()> {
648            use datafusion_datasource_parquet::file_format::ParquetFormatFactory;
649
650            let options = if let Some(parquet_factory) =
651                node.as_any().downcast_ref::<ParquetFormatFactory>()
652            {
653                parquet_factory.options.clone().unwrap_or_default()
654            } else {
655                return exec_err!("Unsupported FileFormatFactory type");
656            };
657
658            let proto = TableParquetOptionsProto::from_factory(&ParquetFormatFactory {
659                options: Some(options),
660            });
661
662            proto.encode(buf).map_err(|e| {
663                exec_datafusion_err!("Failed to encode TableParquetOptionsProto: {e:?}")
664            })?;
665
666            Ok(())
667        }
668    }
669}
670#[cfg(feature = "parquet")]
671pub use parquet::ParquetLogicalExtensionCodec;
672
673#[derive(Debug)]
674pub struct ArrowLogicalExtensionCodec;
675
676// TODO! This is a placeholder for now and needs to be implemented for real.
677impl LogicalExtensionCodec for ArrowLogicalExtensionCodec {
678    fn try_decode(
679        &self,
680        _buf: &[u8],
681        _inputs: &[datafusion_expr::LogicalPlan],
682        _ctx: &TaskContext,
683    ) -> datafusion_common::Result<datafusion_expr::Extension> {
684        not_impl_err!("Method not implemented")
685    }
686
687    fn try_encode(
688        &self,
689        _node: &datafusion_expr::Extension,
690        _buf: &mut Vec<u8>,
691    ) -> datafusion_common::Result<()> {
692        not_impl_err!("Method not implemented")
693    }
694
695    fn try_decode_table_provider(
696        &self,
697        _buf: &[u8],
698        _table_ref: &TableReference,
699        _schema: arrow::datatypes::SchemaRef,
700        _ctx: &TaskContext,
701    ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>> {
702        not_impl_err!("Method not implemented")
703    }
704
705    fn try_encode_table_provider(
706        &self,
707        _table_ref: &TableReference,
708        _node: Arc<dyn datafusion_catalog::TableProvider>,
709        _buf: &mut Vec<u8>,
710    ) -> datafusion_common::Result<()> {
711        not_impl_err!("Method not implemented")
712    }
713
714    fn try_decode_file_format(
715        &self,
716        __buf: &[u8],
717        __ctx: &TaskContext,
718    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
719        Ok(Arc::new(ArrowFormatFactory::new()))
720    }
721
722    fn try_encode_file_format(
723        &self,
724        __buf: &mut Vec<u8>,
725        __node: Arc<dyn FileFormatFactory>,
726    ) -> datafusion_common::Result<()> {
727        Ok(())
728    }
729}
730
731#[derive(Debug)]
732pub struct AvroLogicalExtensionCodec;
733
734// TODO! This is a placeholder for now and needs to be implemented for real.
735impl LogicalExtensionCodec for AvroLogicalExtensionCodec {
736    fn try_decode(
737        &self,
738        _buf: &[u8],
739        _inputs: &[datafusion_expr::LogicalPlan],
740        _ctx: &TaskContext,
741    ) -> datafusion_common::Result<datafusion_expr::Extension> {
742        not_impl_err!("Method not implemented")
743    }
744
745    fn try_encode(
746        &self,
747        _node: &datafusion_expr::Extension,
748        _buf: &mut Vec<u8>,
749    ) -> datafusion_common::Result<()> {
750        not_impl_err!("Method not implemented")
751    }
752
753    fn try_decode_table_provider(
754        &self,
755        _buf: &[u8],
756        _table_ref: &TableReference,
757        _schema: arrow::datatypes::SchemaRef,
758        _cts: &TaskContext,
759    ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>> {
760        not_impl_err!("Method not implemented")
761    }
762
763    fn try_encode_table_provider(
764        &self,
765        _table_ref: &TableReference,
766        _node: Arc<dyn datafusion_catalog::TableProvider>,
767        _buf: &mut Vec<u8>,
768    ) -> datafusion_common::Result<()> {
769        not_impl_err!("Method not implemented")
770    }
771
772    fn try_decode_file_format(
773        &self,
774        __buf: &[u8],
775        __ctx: &TaskContext,
776    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
777        Ok(Arc::new(ArrowFormatFactory::new()))
778    }
779
780    fn try_encode_file_format(
781        &self,
782        __buf: &mut Vec<u8>,
783        __node: Arc<dyn FileFormatFactory>,
784    ) -> datafusion_common::Result<()> {
785        Ok(())
786    }
787}