datafusion_proto/logical_plan/
file_formats.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use crate::protobuf::{CsvOptions as CsvOptionsProto, JsonOptions as JsonOptionsProto};
21use datafusion_common::config::{CsvOptions, JsonOptions};
22use datafusion_common::{
23    TableReference, exec_datafusion_err, exec_err, not_impl_err,
24    parsers::CompressionTypeVariant,
25};
26use datafusion_datasource::file_format::FileFormatFactory;
27use datafusion_datasource_arrow::file_format::ArrowFormatFactory;
28use datafusion_datasource_csv::file_format::CsvFormatFactory;
29use datafusion_datasource_json::file_format::JsonFormatFactory;
30use datafusion_execution::TaskContext;
31use prost::Message;
32
33use super::LogicalExtensionCodec;
34
35#[derive(Debug)]
36pub struct CsvLogicalExtensionCodec;
37
38impl CsvOptionsProto {
39    fn from_factory(factory: &CsvFormatFactory) -> Self {
40        if let Some(options) = &factory.options {
41            CsvOptionsProto {
42                has_header: options.has_header.map_or(vec![], |v| vec![v as u8]),
43                delimiter: vec![options.delimiter],
44                quote: vec![options.quote],
45                terminator: options.terminator.map_or(vec![], |v| vec![v]),
46                escape: options.escape.map_or(vec![], |v| vec![v]),
47                double_quote: options.double_quote.map_or(vec![], |v| vec![v as u8]),
48                compression: options.compression as i32,
49                schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
50                date_format: options.date_format.clone().unwrap_or_default(),
51                datetime_format: options.datetime_format.clone().unwrap_or_default(),
52                timestamp_format: options.timestamp_format.clone().unwrap_or_default(),
53                timestamp_tz_format: options
54                    .timestamp_tz_format
55                    .clone()
56                    .unwrap_or_default(),
57                time_format: options.time_format.clone().unwrap_or_default(),
58                null_value: options.null_value.clone().unwrap_or_default(),
59                null_regex: options.null_regex.clone().unwrap_or_default(),
60                comment: options.comment.map_or(vec![], |v| vec![v]),
61                newlines_in_values: options
62                    .newlines_in_values
63                    .map_or(vec![], |v| vec![v as u8]),
64                truncated_rows: options.truncated_rows.map_or(vec![], |v| vec![v as u8]),
65                compression_level: options.compression_level,
66            }
67        } else {
68            CsvOptionsProto::default()
69        }
70    }
71}
72
73impl From<&CsvOptionsProto> for CsvOptions {
74    fn from(proto: &CsvOptionsProto) -> Self {
75        CsvOptions {
76            has_header: if !proto.has_header.is_empty() {
77                Some(proto.has_header[0] != 0)
78            } else {
79                None
80            },
81            delimiter: proto.delimiter.first().copied().unwrap_or(b','),
82            quote: proto.quote.first().copied().unwrap_or(b'"'),
83            terminator: if !proto.terminator.is_empty() {
84                Some(proto.terminator[0])
85            } else {
86                None
87            },
88            escape: if !proto.escape.is_empty() {
89                Some(proto.escape[0])
90            } else {
91                None
92            },
93            double_quote: if !proto.double_quote.is_empty() {
94                Some(proto.double_quote[0] != 0)
95            } else {
96                None
97            },
98            compression: match proto.compression {
99                0 => CompressionTypeVariant::GZIP,
100                1 => CompressionTypeVariant::BZIP2,
101                2 => CompressionTypeVariant::XZ,
102                3 => CompressionTypeVariant::ZSTD,
103                _ => CompressionTypeVariant::UNCOMPRESSED,
104            },
105            schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
106            date_format: if proto.date_format.is_empty() {
107                None
108            } else {
109                Some(proto.date_format.clone())
110            },
111            datetime_format: if proto.datetime_format.is_empty() {
112                None
113            } else {
114                Some(proto.datetime_format.clone())
115            },
116            timestamp_format: if proto.timestamp_format.is_empty() {
117                None
118            } else {
119                Some(proto.timestamp_format.clone())
120            },
121            timestamp_tz_format: if proto.timestamp_tz_format.is_empty() {
122                None
123            } else {
124                Some(proto.timestamp_tz_format.clone())
125            },
126            time_format: if proto.time_format.is_empty() {
127                None
128            } else {
129                Some(proto.time_format.clone())
130            },
131            null_value: if proto.null_value.is_empty() {
132                None
133            } else {
134                Some(proto.null_value.clone())
135            },
136            null_regex: if proto.null_regex.is_empty() {
137                None
138            } else {
139                Some(proto.null_regex.clone())
140            },
141            comment: if !proto.comment.is_empty() {
142                Some(proto.comment[0])
143            } else {
144                None
145            },
146            newlines_in_values: if proto.newlines_in_values.is_empty() {
147                None
148            } else {
149                Some(proto.newlines_in_values[0] != 0)
150            },
151            truncated_rows: if proto.truncated_rows.is_empty() {
152                None
153            } else {
154                Some(proto.truncated_rows[0] != 0)
155            },
156            compression_level: proto.compression_level,
157        }
158    }
159}
160
161// TODO! This is a placeholder for now and needs to be implemented for real.
162impl LogicalExtensionCodec for CsvLogicalExtensionCodec {
163    fn try_decode(
164        &self,
165        _buf: &[u8],
166        _inputs: &[datafusion_expr::LogicalPlan],
167        _ctx: &TaskContext,
168    ) -> datafusion_common::Result<datafusion_expr::Extension> {
169        not_impl_err!("Method not implemented")
170    }
171
172    fn try_encode(
173        &self,
174        _node: &datafusion_expr::Extension,
175        _buf: &mut Vec<u8>,
176    ) -> datafusion_common::Result<()> {
177        not_impl_err!("Method not implemented")
178    }
179
180    fn try_decode_table_provider(
181        &self,
182        _buf: &[u8],
183        _table_ref: &TableReference,
184        _schema: arrow::datatypes::SchemaRef,
185        _ctx: &TaskContext,
186    ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>> {
187        not_impl_err!("Method not implemented")
188    }
189
190    fn try_encode_table_provider(
191        &self,
192        _table_ref: &TableReference,
193        _node: Arc<dyn datafusion_catalog::TableProvider>,
194        _buf: &mut Vec<u8>,
195    ) -> datafusion_common::Result<()> {
196        not_impl_err!("Method not implemented")
197    }
198
199    fn try_decode_file_format(
200        &self,
201        buf: &[u8],
202        _ctx: &TaskContext,
203    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
204        let proto = CsvOptionsProto::decode(buf).map_err(|e| {
205            exec_datafusion_err!("Failed to decode CsvOptionsProto: {e:?}")
206        })?;
207        let options: CsvOptions = (&proto).into();
208        Ok(Arc::new(CsvFormatFactory {
209            options: Some(options),
210        }))
211    }
212
213    fn try_encode_file_format(
214        &self,
215        buf: &mut Vec<u8>,
216        node: Arc<dyn FileFormatFactory>,
217    ) -> datafusion_common::Result<()> {
218        let options =
219            if let Some(csv_factory) = node.as_any().downcast_ref::<CsvFormatFactory>() {
220                csv_factory.options.clone().unwrap_or_default()
221            } else {
222                return exec_err!("{}", "Unsupported FileFormatFactory type".to_string());
223            };
224
225        let proto = CsvOptionsProto::from_factory(&CsvFormatFactory {
226            options: Some(options),
227        });
228
229        proto
230            .encode(buf)
231            .map_err(|e| exec_datafusion_err!("Failed to encode CsvOptions: {e:?}"))?;
232
233        Ok(())
234    }
235}
236
237impl JsonOptionsProto {
238    fn from_factory(factory: &JsonFormatFactory) -> Self {
239        if let Some(options) = &factory.options {
240            JsonOptionsProto {
241                compression: options.compression as i32,
242                schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
243                compression_level: options.compression_level,
244            }
245        } else {
246            JsonOptionsProto::default()
247        }
248    }
249}
250
251impl From<&JsonOptionsProto> for JsonOptions {
252    fn from(proto: &JsonOptionsProto) -> Self {
253        JsonOptions {
254            compression: match proto.compression {
255                0 => CompressionTypeVariant::GZIP,
256                1 => CompressionTypeVariant::BZIP2,
257                2 => CompressionTypeVariant::XZ,
258                3 => CompressionTypeVariant::ZSTD,
259                _ => CompressionTypeVariant::UNCOMPRESSED,
260            },
261            schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
262            compression_level: proto.compression_level,
263        }
264    }
265}
266
267#[derive(Debug)]
268pub struct JsonLogicalExtensionCodec;
269
270// TODO! This is a placeholder for now and needs to be implemented for real.
271impl LogicalExtensionCodec for JsonLogicalExtensionCodec {
272    fn try_decode(
273        &self,
274        _buf: &[u8],
275        _inputs: &[datafusion_expr::LogicalPlan],
276        _ctx: &TaskContext,
277    ) -> datafusion_common::Result<datafusion_expr::Extension> {
278        not_impl_err!("Method not implemented")
279    }
280
281    fn try_encode(
282        &self,
283        _node: &datafusion_expr::Extension,
284        _buf: &mut Vec<u8>,
285    ) -> datafusion_common::Result<()> {
286        not_impl_err!("Method not implemented")
287    }
288
289    fn try_decode_table_provider(
290        &self,
291        _buf: &[u8],
292        _table_ref: &TableReference,
293        _schema: arrow::datatypes::SchemaRef,
294        _ctx: &TaskContext,
295    ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>> {
296        not_impl_err!("Method not implemented")
297    }
298
299    fn try_encode_table_provider(
300        &self,
301        _table_ref: &TableReference,
302        _node: Arc<dyn datafusion_catalog::TableProvider>,
303        _buf: &mut Vec<u8>,
304    ) -> datafusion_common::Result<()> {
305        not_impl_err!("Method not implemented")
306    }
307
308    fn try_decode_file_format(
309        &self,
310        buf: &[u8],
311        _ctx: &TaskContext,
312    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
313        let proto = JsonOptionsProto::decode(buf).map_err(|e| {
314            exec_datafusion_err!("Failed to decode JsonOptionsProto: {e:?}")
315        })?;
316        let options: JsonOptions = (&proto).into();
317        Ok(Arc::new(JsonFormatFactory {
318            options: Some(options),
319        }))
320    }
321
322    fn try_encode_file_format(
323        &self,
324        buf: &mut Vec<u8>,
325        node: Arc<dyn FileFormatFactory>,
326    ) -> datafusion_common::Result<()> {
327        let options = if let Some(json_factory) =
328            node.as_any().downcast_ref::<JsonFormatFactory>()
329        {
330            json_factory.options.clone().unwrap_or_default()
331        } else {
332            return exec_err!("Unsupported FileFormatFactory type");
333        };
334
335        let proto = JsonOptionsProto::from_factory(&JsonFormatFactory {
336            options: Some(options),
337        });
338
339        proto
340            .encode(buf)
341            .map_err(|e| exec_datafusion_err!("Failed to encode JsonOptions: {e:?}"))?;
342
343        Ok(())
344    }
345}
346
347#[cfg(feature = "parquet")]
348mod parquet {
349    use super::*;
350
351    use crate::protobuf::{
352        ParquetColumnOptions as ParquetColumnOptionsProto, ParquetColumnSpecificOptions,
353        ParquetOptions as ParquetOptionsProto,
354        TableParquetOptions as TableParquetOptionsProto, parquet_column_options,
355        parquet_options,
356    };
357    use datafusion_common::config::{
358        ParquetColumnOptions, ParquetOptions, TableParquetOptions,
359    };
360    use datafusion_datasource_parquet::file_format::ParquetFormatFactory;
361
362    impl TableParquetOptionsProto {
363        fn from_factory(factory: &ParquetFormatFactory) -> Self {
364            let global_options = if let Some(ref options) = factory.options {
365                options.clone()
366            } else {
367                return TableParquetOptionsProto::default();
368            };
369
370            let column_specific_options = global_options.column_specific_options;
371            TableParquetOptionsProto {
372            global: Some(ParquetOptionsProto {
373                enable_page_index: global_options.global.enable_page_index,
374                pruning: global_options.global.pruning,
375                skip_metadata: global_options.global.skip_metadata,
376                metadata_size_hint_opt: global_options.global.metadata_size_hint.map(|size| {
377                    parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size as u64)
378                }),
379                pushdown_filters: global_options.global.pushdown_filters,
380                reorder_filters: global_options.global.reorder_filters,
381                force_filter_selections: global_options.global.force_filter_selections,
382                data_pagesize_limit: global_options.global.data_pagesize_limit as u64,
383                write_batch_size: global_options.global.write_batch_size as u64,
384                writer_version: global_options.global.writer_version.to_string(),
385                compression_opt: global_options.global.compression.map(|compression| {
386                    parquet_options::CompressionOpt::Compression(compression)
387                }),
388                dictionary_enabled_opt: global_options.global.dictionary_enabled.map(|enabled| {
389                    parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
390                }),
391                dictionary_page_size_limit: global_options.global.dictionary_page_size_limit as u64,
392                statistics_enabled_opt: global_options.global.statistics_enabled.map(|enabled| {
393                    parquet_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
394                }),
395                max_row_group_size: global_options.global.max_row_group_size as u64,
396                created_by: global_options.global.created_by.clone(),
397                column_index_truncate_length_opt: global_options.global.column_index_truncate_length.map(|length| {
398                    parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length as u64)
399                }),
400                statistics_truncate_length_opt: global_options.global.statistics_truncate_length.map(|length| {
401                    parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length as u64)
402                }),
403                data_page_row_count_limit: global_options.global.data_page_row_count_limit as u64,
404                encoding_opt: global_options.global.encoding.map(|encoding| {
405                    parquet_options::EncodingOpt::Encoding(encoding)
406                }),
407                bloom_filter_on_read: global_options.global.bloom_filter_on_read,
408                bloom_filter_on_write: global_options.global.bloom_filter_on_write,
409                bloom_filter_fpp_opt: global_options.global.bloom_filter_fpp.map(|fpp| {
410                    parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
411                }),
412                bloom_filter_ndv_opt: global_options.global.bloom_filter_ndv.map(|ndv| {
413                    parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
414                }),
415                allow_single_file_parallelism: global_options.global.allow_single_file_parallelism,
416                maximum_parallel_row_group_writers: global_options.global.maximum_parallel_row_group_writers as u64,
417                maximum_buffered_record_batches_per_stream: global_options.global.maximum_buffered_record_batches_per_stream as u64,
418                schema_force_view_types: global_options.global.schema_force_view_types,
419                binary_as_string: global_options.global.binary_as_string,
420                skip_arrow_metadata: global_options.global.skip_arrow_metadata,
421                coerce_int96_opt: global_options.global.coerce_int96.map(|compression| {
422                    parquet_options::CoerceInt96Opt::CoerceInt96(compression)
423                }),
424                max_predicate_cache_size_opt: global_options.global.max_predicate_cache_size.map(|size| {
425                    parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size as u64)
426                }),
427            }),
428            column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| {
429                ParquetColumnSpecificOptions {
430                    column_name,
431                    options: Some(ParquetColumnOptionsProto {
432                        bloom_filter_enabled_opt: options.bloom_filter_enabled.map(|enabled| {
433                            parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(enabled)
434                        }),
435                        encoding_opt: options.encoding.map(|encoding| {
436                            parquet_column_options::EncodingOpt::Encoding(encoding)
437                        }),
438                        dictionary_enabled_opt: options.dictionary_enabled.map(|enabled| {
439                            parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
440                        }),
441                        compression_opt: options.compression.map(|compression| {
442                            parquet_column_options::CompressionOpt::Compression(compression)
443                        }),
444                        statistics_enabled_opt: options.statistics_enabled.map(|enabled| {
445                            parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
446                        }),
447                        bloom_filter_fpp_opt: options.bloom_filter_fpp.map(|fpp| {
448                            parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
449                        }),
450                        bloom_filter_ndv_opt: options.bloom_filter_ndv.map(|ndv| {
451                            parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
452                        }),
453                    })
454                }
455            }).collect(),
456            key_value_metadata: global_options.key_value_metadata
457                .iter()
458                .filter_map(|(key, value)| {
459                    value.as_ref().map(|v| (key.clone(), v.clone()))
460                })
461                .collect(),
462        }
463        }
464    }
465
466    impl From<&ParquetOptionsProto> for ParquetOptions {
467        fn from(proto: &ParquetOptionsProto) -> Self {
468            ParquetOptions {
469            enable_page_index: proto.enable_page_index,
470            pruning: proto.pruning,
471            skip_metadata: proto.skip_metadata,
472            metadata_size_hint: proto.metadata_size_hint_opt.as_ref().map(|opt| match opt {
473                parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size) => *size as usize,
474            }),
475            pushdown_filters: proto.pushdown_filters,
476            reorder_filters: proto.reorder_filters,
477            force_filter_selections: proto.force_filter_selections,
478            data_pagesize_limit: proto.data_pagesize_limit as usize,
479            write_batch_size: proto.write_batch_size as usize,
480                   // TODO: Consider changing to TryFrom to avoid panic on invalid proto data
481            writer_version: proto.writer_version.parse().expect("
482                Invalid parquet writer version in proto, expected '1.0' or '2.0'
483            "),
484            compression: proto.compression_opt.as_ref().map(|opt| match opt {
485                parquet_options::CompressionOpt::Compression(compression) => compression.clone(),
486            }),
487            dictionary_enabled: proto.dictionary_enabled_opt.as_ref().map(|opt| match opt {
488                parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled) => *enabled,
489            }),
490            dictionary_page_size_limit: proto.dictionary_page_size_limit as usize,
491            statistics_enabled: proto.statistics_enabled_opt.as_ref().map(|opt| match opt {
492                parquet_options::StatisticsEnabledOpt::StatisticsEnabled(statistics) => statistics.clone(),
493            }),
494            max_row_group_size: proto.max_row_group_size as usize,
495            created_by: proto.created_by.clone(),
496            column_index_truncate_length: proto.column_index_truncate_length_opt.as_ref().map(|opt| match opt {
497                parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length) => *length as usize,
498            }),
499            statistics_truncate_length: proto.statistics_truncate_length_opt.as_ref().map(|opt| match opt {
500                parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length) => *length as usize,
501            }),
502            data_page_row_count_limit: proto.data_page_row_count_limit as usize,
503            encoding: proto.encoding_opt.as_ref().map(|opt| match opt {
504                parquet_options::EncodingOpt::Encoding(encoding) => encoding.clone(),
505            }),
506            bloom_filter_on_read: proto.bloom_filter_on_read,
507            bloom_filter_on_write: proto.bloom_filter_on_write,
508            bloom_filter_fpp: proto.bloom_filter_fpp_opt.as_ref().map(|opt| match opt {
509                parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp) => *fpp,
510            }),
511            bloom_filter_ndv: proto.bloom_filter_ndv_opt.as_ref().map(|opt| match opt {
512                parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv) => *ndv,
513            }),
514            allow_single_file_parallelism: proto.allow_single_file_parallelism,
515            maximum_parallel_row_group_writers: proto.maximum_parallel_row_group_writers as usize,
516            maximum_buffered_record_batches_per_stream: proto.maximum_buffered_record_batches_per_stream as usize,
517            schema_force_view_types: proto.schema_force_view_types,
518            binary_as_string: proto.binary_as_string,
519            skip_arrow_metadata: proto.skip_arrow_metadata,
520            coerce_int96: proto.coerce_int96_opt.as_ref().map(|opt| match opt {
521                parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => coerce_int96.clone(),
522            }),
523            max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt {
524                parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize,
525            }),
526        }
527        }
528    }
529
530    impl From<ParquetColumnOptionsProto> for ParquetColumnOptions {
531        fn from(proto: ParquetColumnOptionsProto) -> Self {
532            ParquetColumnOptions {
533            bloom_filter_enabled: proto.bloom_filter_enabled_opt.map(
534                |parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v)| v,
535            ),
536            encoding: proto
537                .encoding_opt
538                .map(|parquet_column_options::EncodingOpt::Encoding(v)| v),
539            dictionary_enabled: proto.dictionary_enabled_opt.map(
540                |parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(v)| v,
541            ),
542            compression: proto
543                .compression_opt
544                .map(|parquet_column_options::CompressionOpt::Compression(v)| v),
545            statistics_enabled: proto.statistics_enabled_opt.map(
546                |parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(v)| v,
547            ),
548            bloom_filter_fpp: proto
549                .bloom_filter_fpp_opt
550                .map(|parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v)| v),
551            bloom_filter_ndv: proto
552                .bloom_filter_ndv_opt
553                .map(|parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v)| v),
554        }
555        }
556    }
557
558    impl From<&TableParquetOptionsProto> for TableParquetOptions {
559        fn from(proto: &TableParquetOptionsProto) -> Self {
560            TableParquetOptions {
561                global: proto
562                    .global
563                    .as_ref()
564                    .map(ParquetOptions::from)
565                    .unwrap_or_default(),
566                column_specific_options: proto
567                    .column_specific_options
568                    .iter()
569                    .map(|parquet_column_options| {
570                        (
571                            parquet_column_options.column_name.clone(),
572                            ParquetColumnOptions::from(
573                                parquet_column_options
574                                    .options
575                                    .clone()
576                                    .unwrap_or_default(),
577                            ),
578                        )
579                    })
580                    .collect(),
581                key_value_metadata: proto
582                    .key_value_metadata
583                    .iter()
584                    .map(|(k, v)| (k.clone(), Some(v.clone())))
585                    .collect(),
586                crypto: Default::default(),
587            }
588        }
589    }
590
591    #[derive(Debug)]
592    pub struct ParquetLogicalExtensionCodec;
593
594    // TODO! This is a placeholder for now and needs to be implemented for real.
595    impl LogicalExtensionCodec for ParquetLogicalExtensionCodec {
596        fn try_decode(
597            &self,
598            _buf: &[u8],
599            _inputs: &[datafusion_expr::LogicalPlan],
600            _ctx: &TaskContext,
601        ) -> datafusion_common::Result<datafusion_expr::Extension> {
602            not_impl_err!("Method not implemented")
603        }
604
605        fn try_encode(
606            &self,
607            _node: &datafusion_expr::Extension,
608            _buf: &mut Vec<u8>,
609        ) -> datafusion_common::Result<()> {
610            not_impl_err!("Method not implemented")
611        }
612
613        fn try_decode_table_provider(
614            &self,
615            _buf: &[u8],
616            _table_ref: &TableReference,
617            _schema: arrow::datatypes::SchemaRef,
618            _ctx: &TaskContext,
619        ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>>
620        {
621            not_impl_err!("Method not implemented")
622        }
623
624        fn try_encode_table_provider(
625            &self,
626            _table_ref: &TableReference,
627            _node: Arc<dyn datafusion_catalog::TableProvider>,
628            _buf: &mut Vec<u8>,
629        ) -> datafusion_common::Result<()> {
630            not_impl_err!("Method not implemented")
631        }
632
633        fn try_decode_file_format(
634            &self,
635            buf: &[u8],
636            _ctx: &TaskContext,
637        ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
638            let proto = TableParquetOptionsProto::decode(buf).map_err(|e| {
639                exec_datafusion_err!("Failed to decode TableParquetOptionsProto: {e:?}")
640            })?;
641            let options: TableParquetOptions = (&proto).into();
642            Ok(Arc::new(
643                datafusion_datasource_parquet::file_format::ParquetFormatFactory {
644                    options: Some(options),
645                },
646            ))
647        }
648
649        fn try_encode_file_format(
650            &self,
651            buf: &mut Vec<u8>,
652            node: Arc<dyn FileFormatFactory>,
653        ) -> datafusion_common::Result<()> {
654            use datafusion_datasource_parquet::file_format::ParquetFormatFactory;
655
656            let options = if let Some(parquet_factory) =
657                node.as_any().downcast_ref::<ParquetFormatFactory>()
658            {
659                parquet_factory.options.clone().unwrap_or_default()
660            } else {
661                return exec_err!("Unsupported FileFormatFactory type");
662            };
663
664            let proto = TableParquetOptionsProto::from_factory(&ParquetFormatFactory {
665                options: Some(options),
666            });
667
668            proto.encode(buf).map_err(|e| {
669                exec_datafusion_err!("Failed to encode TableParquetOptionsProto: {e:?}")
670            })?;
671
672            Ok(())
673        }
674    }
675}
676#[cfg(feature = "parquet")]
677pub use parquet::ParquetLogicalExtensionCodec;
678
679#[derive(Debug)]
680pub struct ArrowLogicalExtensionCodec;
681
682// TODO! This is a placeholder for now and needs to be implemented for real.
683impl LogicalExtensionCodec for ArrowLogicalExtensionCodec {
684    fn try_decode(
685        &self,
686        _buf: &[u8],
687        _inputs: &[datafusion_expr::LogicalPlan],
688        _ctx: &TaskContext,
689    ) -> datafusion_common::Result<datafusion_expr::Extension> {
690        not_impl_err!("Method not implemented")
691    }
692
693    fn try_encode(
694        &self,
695        _node: &datafusion_expr::Extension,
696        _buf: &mut Vec<u8>,
697    ) -> datafusion_common::Result<()> {
698        not_impl_err!("Method not implemented")
699    }
700
701    fn try_decode_table_provider(
702        &self,
703        _buf: &[u8],
704        _table_ref: &TableReference,
705        _schema: arrow::datatypes::SchemaRef,
706        _ctx: &TaskContext,
707    ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>> {
708        not_impl_err!("Method not implemented")
709    }
710
711    fn try_encode_table_provider(
712        &self,
713        _table_ref: &TableReference,
714        _node: Arc<dyn datafusion_catalog::TableProvider>,
715        _buf: &mut Vec<u8>,
716    ) -> datafusion_common::Result<()> {
717        not_impl_err!("Method not implemented")
718    }
719
720    fn try_decode_file_format(
721        &self,
722        __buf: &[u8],
723        __ctx: &TaskContext,
724    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
725        Ok(Arc::new(ArrowFormatFactory::new()))
726    }
727
728    fn try_encode_file_format(
729        &self,
730        __buf: &mut Vec<u8>,
731        __node: Arc<dyn FileFormatFactory>,
732    ) -> datafusion_common::Result<()> {
733        Ok(())
734    }
735}
736
737#[derive(Debug)]
738pub struct AvroLogicalExtensionCodec;
739
740// TODO! This is a placeholder for now and needs to be implemented for real.
741impl LogicalExtensionCodec for AvroLogicalExtensionCodec {
742    fn try_decode(
743        &self,
744        _buf: &[u8],
745        _inputs: &[datafusion_expr::LogicalPlan],
746        _ctx: &TaskContext,
747    ) -> datafusion_common::Result<datafusion_expr::Extension> {
748        not_impl_err!("Method not implemented")
749    }
750
751    fn try_encode(
752        &self,
753        _node: &datafusion_expr::Extension,
754        _buf: &mut Vec<u8>,
755    ) -> datafusion_common::Result<()> {
756        not_impl_err!("Method not implemented")
757    }
758
759    fn try_decode_table_provider(
760        &self,
761        _buf: &[u8],
762        _table_ref: &TableReference,
763        _schema: arrow::datatypes::SchemaRef,
764        _cts: &TaskContext,
765    ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>> {
766        not_impl_err!("Method not implemented")
767    }
768
769    fn try_encode_table_provider(
770        &self,
771        _table_ref: &TableReference,
772        _node: Arc<dyn datafusion_catalog::TableProvider>,
773        _buf: &mut Vec<u8>,
774    ) -> datafusion_common::Result<()> {
775        not_impl_err!("Method not implemented")
776    }
777
778    fn try_decode_file_format(
779        &self,
780        __buf: &[u8],
781        __ctx: &TaskContext,
782    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
783        Ok(Arc::new(ArrowFormatFactory::new()))
784    }
785
786    fn try_encode_file_format(
787        &self,
788        __buf: &mut Vec<u8>,
789        __node: Arc<dyn FileFormatFactory>,
790    ) -> datafusion_common::Result<()> {
791        Ok(())
792    }
793}