Skip to main content

datafusion_proto/logical_plan/
file_formats.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use crate::protobuf::{CsvOptions as CsvOptionsProto, JsonOptions as JsonOptionsProto};
21use datafusion_common::config::{CsvOptions, JsonOptions};
22use datafusion_common::{
23    TableReference, exec_datafusion_err, exec_err, not_impl_err,
24    parsers::CompressionTypeVariant,
25};
26use datafusion_datasource::file_format::FileFormatFactory;
27use datafusion_datasource_arrow::file_format::ArrowFormatFactory;
28use datafusion_datasource_csv::file_format::CsvFormatFactory;
29use datafusion_datasource_json::file_format::JsonFormatFactory;
30use datafusion_execution::TaskContext;
31use prost::Message;
32
33use super::LogicalExtensionCodec;
34
35#[derive(Debug)]
36pub struct CsvLogicalExtensionCodec;
37
38impl CsvOptionsProto {
39    fn from_factory(factory: &CsvFormatFactory) -> Self {
40        if let Some(options) = &factory.options {
41            CsvOptionsProto {
42                has_header: options.has_header.map_or(vec![], |v| vec![v as u8]),
43                delimiter: vec![options.delimiter],
44                quote: vec![options.quote],
45                terminator: options.terminator.map_or(vec![], |v| vec![v]),
46                escape: options.escape.map_or(vec![], |v| vec![v]),
47                double_quote: options.double_quote.map_or(vec![], |v| vec![v as u8]),
48                compression: options.compression as i32,
49                schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
50                date_format: options.date_format.clone().unwrap_or_default(),
51                datetime_format: options.datetime_format.clone().unwrap_or_default(),
52                timestamp_format: options.timestamp_format.clone().unwrap_or_default(),
53                timestamp_tz_format: options
54                    .timestamp_tz_format
55                    .clone()
56                    .unwrap_or_default(),
57                time_format: options.time_format.clone().unwrap_or_default(),
58                null_value: options.null_value.clone().unwrap_or_default(),
59                null_regex: options.null_regex.clone().unwrap_or_default(),
60                comment: options.comment.map_or(vec![], |v| vec![v]),
61                newlines_in_values: options
62                    .newlines_in_values
63                    .map_or(vec![], |v| vec![v as u8]),
64                truncated_rows: options.truncated_rows.map_or(vec![], |v| vec![v as u8]),
65                compression_level: options.compression_level,
66            }
67        } else {
68            CsvOptionsProto::default()
69        }
70    }
71}
72
73impl From<&CsvOptionsProto> for CsvOptions {
74    fn from(proto: &CsvOptionsProto) -> Self {
75        CsvOptions {
76            has_header: if !proto.has_header.is_empty() {
77                Some(proto.has_header[0] != 0)
78            } else {
79                None
80            },
81            delimiter: proto.delimiter.first().copied().unwrap_or(b','),
82            quote: proto.quote.first().copied().unwrap_or(b'"'),
83            terminator: if !proto.terminator.is_empty() {
84                Some(proto.terminator[0])
85            } else {
86                None
87            },
88            escape: if !proto.escape.is_empty() {
89                Some(proto.escape[0])
90            } else {
91                None
92            },
93            double_quote: if !proto.double_quote.is_empty() {
94                Some(proto.double_quote[0] != 0)
95            } else {
96                None
97            },
98            compression: match proto.compression {
99                0 => CompressionTypeVariant::GZIP,
100                1 => CompressionTypeVariant::BZIP2,
101                2 => CompressionTypeVariant::XZ,
102                3 => CompressionTypeVariant::ZSTD,
103                _ => CompressionTypeVariant::UNCOMPRESSED,
104            },
105            schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
106            date_format: if proto.date_format.is_empty() {
107                None
108            } else {
109                Some(proto.date_format.clone())
110            },
111            datetime_format: if proto.datetime_format.is_empty() {
112                None
113            } else {
114                Some(proto.datetime_format.clone())
115            },
116            timestamp_format: if proto.timestamp_format.is_empty() {
117                None
118            } else {
119                Some(proto.timestamp_format.clone())
120            },
121            timestamp_tz_format: if proto.timestamp_tz_format.is_empty() {
122                None
123            } else {
124                Some(proto.timestamp_tz_format.clone())
125            },
126            time_format: if proto.time_format.is_empty() {
127                None
128            } else {
129                Some(proto.time_format.clone())
130            },
131            null_value: if proto.null_value.is_empty() {
132                None
133            } else {
134                Some(proto.null_value.clone())
135            },
136            null_regex: if proto.null_regex.is_empty() {
137                None
138            } else {
139                Some(proto.null_regex.clone())
140            },
141            comment: if !proto.comment.is_empty() {
142                Some(proto.comment[0])
143            } else {
144                None
145            },
146            newlines_in_values: if proto.newlines_in_values.is_empty() {
147                None
148            } else {
149                Some(proto.newlines_in_values[0] != 0)
150            },
151            truncated_rows: if proto.truncated_rows.is_empty() {
152                None
153            } else {
154                Some(proto.truncated_rows[0] != 0)
155            },
156            compression_level: proto.compression_level,
157        }
158    }
159}
160
161// TODO! This is a placeholder for now and needs to be implemented for real.
162impl LogicalExtensionCodec for CsvLogicalExtensionCodec {
163    fn try_decode(
164        &self,
165        _buf: &[u8],
166        _inputs: &[datafusion_expr::LogicalPlan],
167        _ctx: &TaskContext,
168    ) -> datafusion_common::Result<datafusion_expr::Extension> {
169        not_impl_err!("Method not implemented")
170    }
171
172    fn try_encode(
173        &self,
174        _node: &datafusion_expr::Extension,
175        _buf: &mut Vec<u8>,
176    ) -> datafusion_common::Result<()> {
177        not_impl_err!("Method not implemented")
178    }
179
180    fn try_decode_table_provider(
181        &self,
182        _buf: &[u8],
183        _table_ref: &TableReference,
184        _schema: arrow::datatypes::SchemaRef,
185        _ctx: &TaskContext,
186    ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>> {
187        not_impl_err!("Method not implemented")
188    }
189
190    fn try_encode_table_provider(
191        &self,
192        _table_ref: &TableReference,
193        _node: Arc<dyn datafusion_catalog::TableProvider>,
194        _buf: &mut Vec<u8>,
195    ) -> datafusion_common::Result<()> {
196        not_impl_err!("Method not implemented")
197    }
198
199    fn try_decode_file_format(
200        &self,
201        buf: &[u8],
202        _ctx: &TaskContext,
203    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
204        let proto = CsvOptionsProto::decode(buf).map_err(|e| {
205            exec_datafusion_err!("Failed to decode CsvOptionsProto: {e:?}")
206        })?;
207        let options: CsvOptions = (&proto).into();
208        Ok(Arc::new(CsvFormatFactory {
209            options: Some(options),
210        }))
211    }
212
213    fn try_encode_file_format(
214        &self,
215        buf: &mut Vec<u8>,
216        node: Arc<dyn FileFormatFactory>,
217    ) -> datafusion_common::Result<()> {
218        let options =
219            if let Some(csv_factory) = node.as_any().downcast_ref::<CsvFormatFactory>() {
220                csv_factory.options.clone().unwrap_or_default()
221            } else {
222                return exec_err!("{}", "Unsupported FileFormatFactory type".to_string());
223            };
224
225        let proto = CsvOptionsProto::from_factory(&CsvFormatFactory {
226            options: Some(options),
227        });
228
229        proto
230            .encode(buf)
231            .map_err(|e| exec_datafusion_err!("Failed to encode CsvOptions: {e:?}"))?;
232
233        Ok(())
234    }
235}
236
237impl JsonOptionsProto {
238    fn from_factory(factory: &JsonFormatFactory) -> Self {
239        if let Some(options) = &factory.options {
240            JsonOptionsProto {
241                compression: options.compression as i32,
242                schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
243                compression_level: options.compression_level,
244                newline_delimited: Some(options.newline_delimited),
245            }
246        } else {
247            JsonOptionsProto::default()
248        }
249    }
250}
251
252impl From<&JsonOptionsProto> for JsonOptions {
253    fn from(proto: &JsonOptionsProto) -> Self {
254        JsonOptions {
255            compression: match proto.compression {
256                0 => CompressionTypeVariant::GZIP,
257                1 => CompressionTypeVariant::BZIP2,
258                2 => CompressionTypeVariant::XZ,
259                3 => CompressionTypeVariant::ZSTD,
260                _ => CompressionTypeVariant::UNCOMPRESSED,
261            },
262            schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
263            compression_level: proto.compression_level,
264            newline_delimited: proto.newline_delimited.unwrap_or(true),
265        }
266    }
267}
268
269#[derive(Debug)]
270pub struct JsonLogicalExtensionCodec;
271
272// TODO! This is a placeholder for now and needs to be implemented for real.
273impl LogicalExtensionCodec for JsonLogicalExtensionCodec {
274    fn try_decode(
275        &self,
276        _buf: &[u8],
277        _inputs: &[datafusion_expr::LogicalPlan],
278        _ctx: &TaskContext,
279    ) -> datafusion_common::Result<datafusion_expr::Extension> {
280        not_impl_err!("Method not implemented")
281    }
282
283    fn try_encode(
284        &self,
285        _node: &datafusion_expr::Extension,
286        _buf: &mut Vec<u8>,
287    ) -> datafusion_common::Result<()> {
288        not_impl_err!("Method not implemented")
289    }
290
291    fn try_decode_table_provider(
292        &self,
293        _buf: &[u8],
294        _table_ref: &TableReference,
295        _schema: arrow::datatypes::SchemaRef,
296        _ctx: &TaskContext,
297    ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>> {
298        not_impl_err!("Method not implemented")
299    }
300
301    fn try_encode_table_provider(
302        &self,
303        _table_ref: &TableReference,
304        _node: Arc<dyn datafusion_catalog::TableProvider>,
305        _buf: &mut Vec<u8>,
306    ) -> datafusion_common::Result<()> {
307        not_impl_err!("Method not implemented")
308    }
309
310    fn try_decode_file_format(
311        &self,
312        buf: &[u8],
313        _ctx: &TaskContext,
314    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
315        let proto = JsonOptionsProto::decode(buf).map_err(|e| {
316            exec_datafusion_err!("Failed to decode JsonOptionsProto: {e:?}")
317        })?;
318        let options: JsonOptions = (&proto).into();
319        Ok(Arc::new(JsonFormatFactory {
320            options: Some(options),
321        }))
322    }
323
324    fn try_encode_file_format(
325        &self,
326        buf: &mut Vec<u8>,
327        node: Arc<dyn FileFormatFactory>,
328    ) -> datafusion_common::Result<()> {
329        let options = if let Some(json_factory) =
330            node.as_any().downcast_ref::<JsonFormatFactory>()
331        {
332            json_factory.options.clone().unwrap_or_default()
333        } else {
334            return exec_err!("Unsupported FileFormatFactory type");
335        };
336
337        let proto = JsonOptionsProto::from_factory(&JsonFormatFactory {
338            options: Some(options),
339        });
340
341        proto
342            .encode(buf)
343            .map_err(|e| exec_datafusion_err!("Failed to encode JsonOptions: {e:?}"))?;
344
345        Ok(())
346    }
347}
348
349#[cfg(feature = "parquet")]
350mod parquet {
351    use super::*;
352
353    use crate::protobuf::{
354        ParquetColumnOptions as ParquetColumnOptionsProto, ParquetColumnSpecificOptions,
355        ParquetOptions as ParquetOptionsProto,
356        TableParquetOptions as TableParquetOptionsProto, parquet_column_options,
357        parquet_options,
358    };
359    use datafusion_common::config::{
360        ParquetColumnOptions, ParquetOptions, TableParquetOptions,
361    };
362    use datafusion_datasource_parquet::file_format::ParquetFormatFactory;
363
364    impl TableParquetOptionsProto {
365        fn from_factory(factory: &ParquetFormatFactory) -> Self {
366            let global_options = if let Some(ref options) = factory.options {
367                options.clone()
368            } else {
369                return TableParquetOptionsProto::default();
370            };
371
372            let column_specific_options = global_options.column_specific_options;
373            TableParquetOptionsProto {
374            global: Some(ParquetOptionsProto {
375                enable_page_index: global_options.global.enable_page_index,
376                pruning: global_options.global.pruning,
377                skip_metadata: global_options.global.skip_metadata,
378                metadata_size_hint_opt: global_options.global.metadata_size_hint.map(|size| {
379                    parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size as u64)
380                }),
381                pushdown_filters: global_options.global.pushdown_filters,
382                reorder_filters: global_options.global.reorder_filters,
383                force_filter_selections: global_options.global.force_filter_selections,
384                data_pagesize_limit: global_options.global.data_pagesize_limit as u64,
385                write_batch_size: global_options.global.write_batch_size as u64,
386                writer_version: global_options.global.writer_version.to_string(),
387                compression_opt: global_options.global.compression.map(|compression| {
388                    parquet_options::CompressionOpt::Compression(compression)
389                }),
390                dictionary_enabled_opt: global_options.global.dictionary_enabled.map(|enabled| {
391                    parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
392                }),
393                dictionary_page_size_limit: global_options.global.dictionary_page_size_limit as u64,
394                statistics_enabled_opt: global_options.global.statistics_enabled.map(|enabled| {
395                    parquet_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
396                }),
397                max_row_group_size: global_options.global.max_row_group_size as u64,
398                created_by: global_options.global.created_by.clone(),
399                column_index_truncate_length_opt: global_options.global.column_index_truncate_length.map(|length| {
400                    parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length as u64)
401                }),
402                statistics_truncate_length_opt: global_options.global.statistics_truncate_length.map(|length| {
403                    parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length as u64)
404                }),
405                data_page_row_count_limit: global_options.global.data_page_row_count_limit as u64,
406                encoding_opt: global_options.global.encoding.map(|encoding| {
407                    parquet_options::EncodingOpt::Encoding(encoding)
408                }),
409                bloom_filter_on_read: global_options.global.bloom_filter_on_read,
410                bloom_filter_on_write: global_options.global.bloom_filter_on_write,
411                bloom_filter_fpp_opt: global_options.global.bloom_filter_fpp.map(|fpp| {
412                    parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
413                }),
414                bloom_filter_ndv_opt: global_options.global.bloom_filter_ndv.map(|ndv| {
415                    parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
416                }),
417                allow_single_file_parallelism: global_options.global.allow_single_file_parallelism,
418                maximum_parallel_row_group_writers: global_options.global.maximum_parallel_row_group_writers as u64,
419                maximum_buffered_record_batches_per_stream: global_options.global.maximum_buffered_record_batches_per_stream as u64,
420                schema_force_view_types: global_options.global.schema_force_view_types,
421                binary_as_string: global_options.global.binary_as_string,
422                skip_arrow_metadata: global_options.global.skip_arrow_metadata,
423                coerce_int96_opt: global_options.global.coerce_int96.map(|compression| {
424                    parquet_options::CoerceInt96Opt::CoerceInt96(compression)
425                }),
426                max_predicate_cache_size_opt: global_options.global.max_predicate_cache_size.map(|size| {
427                    parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size as u64)
428                }),
429            }),
430            column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| {
431                ParquetColumnSpecificOptions {
432                    column_name,
433                    options: Some(ParquetColumnOptionsProto {
434                        bloom_filter_enabled_opt: options.bloom_filter_enabled.map(|enabled| {
435                            parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(enabled)
436                        }),
437                        encoding_opt: options.encoding.map(|encoding| {
438                            parquet_column_options::EncodingOpt::Encoding(encoding)
439                        }),
440                        dictionary_enabled_opt: options.dictionary_enabled.map(|enabled| {
441                            parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
442                        }),
443                        compression_opt: options.compression.map(|compression| {
444                            parquet_column_options::CompressionOpt::Compression(compression)
445                        }),
446                        statistics_enabled_opt: options.statistics_enabled.map(|enabled| {
447                            parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
448                        }),
449                        bloom_filter_fpp_opt: options.bloom_filter_fpp.map(|fpp| {
450                            parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
451                        }),
452                        bloom_filter_ndv_opt: options.bloom_filter_ndv.map(|ndv| {
453                            parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
454                        }),
455                    })
456                }
457            }).collect(),
458            key_value_metadata: global_options.key_value_metadata
459                .iter()
460                .filter_map(|(key, value)| {
461                    value.as_ref().map(|v| (key.clone(), v.clone()))
462                })
463                .collect(),
464        }
465        }
466    }
467
468    impl From<&ParquetOptionsProto> for ParquetOptions {
469        fn from(proto: &ParquetOptionsProto) -> Self {
470            ParquetOptions {
471            enable_page_index: proto.enable_page_index,
472            pruning: proto.pruning,
473            skip_metadata: proto.skip_metadata,
474            metadata_size_hint: proto.metadata_size_hint_opt.as_ref().map(|opt| match opt {
475                parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size) => *size as usize,
476            }),
477            pushdown_filters: proto.pushdown_filters,
478            reorder_filters: proto.reorder_filters,
479            force_filter_selections: proto.force_filter_selections,
480            data_pagesize_limit: proto.data_pagesize_limit as usize,
481            write_batch_size: proto.write_batch_size as usize,
482                   // TODO: Consider changing to TryFrom to avoid panic on invalid proto data
483            writer_version: proto.writer_version.parse().expect("
484                Invalid parquet writer version in proto, expected '1.0' or '2.0'
485            "),
486            compression: proto.compression_opt.as_ref().map(|opt| match opt {
487                parquet_options::CompressionOpt::Compression(compression) => compression.clone(),
488            }),
489            dictionary_enabled: proto.dictionary_enabled_opt.as_ref().map(|opt| match opt {
490                parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled) => *enabled,
491            }),
492            dictionary_page_size_limit: proto.dictionary_page_size_limit as usize,
493            statistics_enabled: proto.statistics_enabled_opt.as_ref().map(|opt| match opt {
494                parquet_options::StatisticsEnabledOpt::StatisticsEnabled(statistics) => statistics.clone(),
495            }),
496            max_row_group_size: proto.max_row_group_size as usize,
497            created_by: proto.created_by.clone(),
498            column_index_truncate_length: proto.column_index_truncate_length_opt.as_ref().map(|opt| match opt {
499                parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length) => *length as usize,
500            }),
501            statistics_truncate_length: proto.statistics_truncate_length_opt.as_ref().map(|opt| match opt {
502                parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length) => *length as usize,
503            }),
504            data_page_row_count_limit: proto.data_page_row_count_limit as usize,
505            encoding: proto.encoding_opt.as_ref().map(|opt| match opt {
506                parquet_options::EncodingOpt::Encoding(encoding) => encoding.clone(),
507            }),
508            bloom_filter_on_read: proto.bloom_filter_on_read,
509            bloom_filter_on_write: proto.bloom_filter_on_write,
510            bloom_filter_fpp: proto.bloom_filter_fpp_opt.as_ref().map(|opt| match opt {
511                parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp) => *fpp,
512            }),
513            bloom_filter_ndv: proto.bloom_filter_ndv_opt.as_ref().map(|opt| match opt {
514                parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv) => *ndv,
515            }),
516            allow_single_file_parallelism: proto.allow_single_file_parallelism,
517            maximum_parallel_row_group_writers: proto.maximum_parallel_row_group_writers as usize,
518            maximum_buffered_record_batches_per_stream: proto.maximum_buffered_record_batches_per_stream as usize,
519            schema_force_view_types: proto.schema_force_view_types,
520            binary_as_string: proto.binary_as_string,
521            skip_arrow_metadata: proto.skip_arrow_metadata,
522            coerce_int96: proto.coerce_int96_opt.as_ref().map(|opt| match opt {
523                parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => coerce_int96.clone(),
524            }),
525            max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt {
526                parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize,
527            }),
528        }
529        }
530    }
531
532    impl From<ParquetColumnOptionsProto> for ParquetColumnOptions {
533        fn from(proto: ParquetColumnOptionsProto) -> Self {
534            ParquetColumnOptions {
535            bloom_filter_enabled: proto.bloom_filter_enabled_opt.map(
536                |parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v)| v,
537            ),
538            encoding: proto
539                .encoding_opt
540                .map(|parquet_column_options::EncodingOpt::Encoding(v)| v),
541            dictionary_enabled: proto.dictionary_enabled_opt.map(
542                |parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(v)| v,
543            ),
544            compression: proto
545                .compression_opt
546                .map(|parquet_column_options::CompressionOpt::Compression(v)| v),
547            statistics_enabled: proto.statistics_enabled_opt.map(
548                |parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(v)| v,
549            ),
550            bloom_filter_fpp: proto
551                .bloom_filter_fpp_opt
552                .map(|parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v)| v),
553            bloom_filter_ndv: proto
554                .bloom_filter_ndv_opt
555                .map(|parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v)| v),
556        }
557        }
558    }
559
560    impl From<&TableParquetOptionsProto> for TableParquetOptions {
561        fn from(proto: &TableParquetOptionsProto) -> Self {
562            TableParquetOptions {
563                global: proto
564                    .global
565                    .as_ref()
566                    .map(ParquetOptions::from)
567                    .unwrap_or_default(),
568                column_specific_options: proto
569                    .column_specific_options
570                    .iter()
571                    .map(|parquet_column_options| {
572                        (
573                            parquet_column_options.column_name.clone(),
574                            ParquetColumnOptions::from(
575                                parquet_column_options
576                                    .options
577                                    .clone()
578                                    .unwrap_or_default(),
579                            ),
580                        )
581                    })
582                    .collect(),
583                key_value_metadata: proto
584                    .key_value_metadata
585                    .iter()
586                    .map(|(k, v)| (k.clone(), Some(v.clone())))
587                    .collect(),
588                crypto: Default::default(),
589            }
590        }
591    }
592
593    #[derive(Debug)]
594    pub struct ParquetLogicalExtensionCodec;
595
596    // TODO! This is a placeholder for now and needs to be implemented for real.
597    impl LogicalExtensionCodec for ParquetLogicalExtensionCodec {
598        fn try_decode(
599            &self,
600            _buf: &[u8],
601            _inputs: &[datafusion_expr::LogicalPlan],
602            _ctx: &TaskContext,
603        ) -> datafusion_common::Result<datafusion_expr::Extension> {
604            not_impl_err!("Method not implemented")
605        }
606
607        fn try_encode(
608            &self,
609            _node: &datafusion_expr::Extension,
610            _buf: &mut Vec<u8>,
611        ) -> datafusion_common::Result<()> {
612            not_impl_err!("Method not implemented")
613        }
614
615        fn try_decode_table_provider(
616            &self,
617            _buf: &[u8],
618            _table_ref: &TableReference,
619            _schema: arrow::datatypes::SchemaRef,
620            _ctx: &TaskContext,
621        ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>>
622        {
623            not_impl_err!("Method not implemented")
624        }
625
626        fn try_encode_table_provider(
627            &self,
628            _table_ref: &TableReference,
629            _node: Arc<dyn datafusion_catalog::TableProvider>,
630            _buf: &mut Vec<u8>,
631        ) -> datafusion_common::Result<()> {
632            not_impl_err!("Method not implemented")
633        }
634
635        fn try_decode_file_format(
636            &self,
637            buf: &[u8],
638            _ctx: &TaskContext,
639        ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
640            let proto = TableParquetOptionsProto::decode(buf).map_err(|e| {
641                exec_datafusion_err!("Failed to decode TableParquetOptionsProto: {e:?}")
642            })?;
643            let options: TableParquetOptions = (&proto).into();
644            Ok(Arc::new(
645                datafusion_datasource_parquet::file_format::ParquetFormatFactory {
646                    options: Some(options),
647                },
648            ))
649        }
650
651        fn try_encode_file_format(
652            &self,
653            buf: &mut Vec<u8>,
654            node: Arc<dyn FileFormatFactory>,
655        ) -> datafusion_common::Result<()> {
656            use datafusion_datasource_parquet::file_format::ParquetFormatFactory;
657
658            let options = if let Some(parquet_factory) =
659                node.as_any().downcast_ref::<ParquetFormatFactory>()
660            {
661                parquet_factory.options.clone().unwrap_or_default()
662            } else {
663                return exec_err!("Unsupported FileFormatFactory type");
664            };
665
666            let proto = TableParquetOptionsProto::from_factory(&ParquetFormatFactory {
667                options: Some(options),
668            });
669
670            proto.encode(buf).map_err(|e| {
671                exec_datafusion_err!("Failed to encode TableParquetOptionsProto: {e:?}")
672            })?;
673
674            Ok(())
675        }
676    }
677}
678#[cfg(feature = "parquet")]
679pub use parquet::ParquetLogicalExtensionCodec;
680
681#[derive(Debug)]
682pub struct ArrowLogicalExtensionCodec;
683
684// TODO! This is a placeholder for now and needs to be implemented for real.
685impl LogicalExtensionCodec for ArrowLogicalExtensionCodec {
686    fn try_decode(
687        &self,
688        _buf: &[u8],
689        _inputs: &[datafusion_expr::LogicalPlan],
690        _ctx: &TaskContext,
691    ) -> datafusion_common::Result<datafusion_expr::Extension> {
692        not_impl_err!("Method not implemented")
693    }
694
695    fn try_encode(
696        &self,
697        _node: &datafusion_expr::Extension,
698        _buf: &mut Vec<u8>,
699    ) -> datafusion_common::Result<()> {
700        not_impl_err!("Method not implemented")
701    }
702
703    fn try_decode_table_provider(
704        &self,
705        _buf: &[u8],
706        _table_ref: &TableReference,
707        _schema: arrow::datatypes::SchemaRef,
708        _ctx: &TaskContext,
709    ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>> {
710        not_impl_err!("Method not implemented")
711    }
712
713    fn try_encode_table_provider(
714        &self,
715        _table_ref: &TableReference,
716        _node: Arc<dyn datafusion_catalog::TableProvider>,
717        _buf: &mut Vec<u8>,
718    ) -> datafusion_common::Result<()> {
719        not_impl_err!("Method not implemented")
720    }
721
722    fn try_decode_file_format(
723        &self,
724        __buf: &[u8],
725        __ctx: &TaskContext,
726    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
727        Ok(Arc::new(ArrowFormatFactory::new()))
728    }
729
730    fn try_encode_file_format(
731        &self,
732        __buf: &mut Vec<u8>,
733        __node: Arc<dyn FileFormatFactory>,
734    ) -> datafusion_common::Result<()> {
735        Ok(())
736    }
737}
738
739#[derive(Debug)]
740pub struct AvroLogicalExtensionCodec;
741
742// TODO! This is a placeholder for now and needs to be implemented for real.
743impl LogicalExtensionCodec for AvroLogicalExtensionCodec {
744    fn try_decode(
745        &self,
746        _buf: &[u8],
747        _inputs: &[datafusion_expr::LogicalPlan],
748        _ctx: &TaskContext,
749    ) -> datafusion_common::Result<datafusion_expr::Extension> {
750        not_impl_err!("Method not implemented")
751    }
752
753    fn try_encode(
754        &self,
755        _node: &datafusion_expr::Extension,
756        _buf: &mut Vec<u8>,
757    ) -> datafusion_common::Result<()> {
758        not_impl_err!("Method not implemented")
759    }
760
761    fn try_decode_table_provider(
762        &self,
763        _buf: &[u8],
764        _table_ref: &TableReference,
765        _schema: arrow::datatypes::SchemaRef,
766        _cts: &TaskContext,
767    ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>> {
768        not_impl_err!("Method not implemented")
769    }
770
771    fn try_encode_table_provider(
772        &self,
773        _table_ref: &TableReference,
774        _node: Arc<dyn datafusion_catalog::TableProvider>,
775        _buf: &mut Vec<u8>,
776    ) -> datafusion_common::Result<()> {
777        not_impl_err!("Method not implemented")
778    }
779
780    fn try_decode_file_format(
781        &self,
782        __buf: &[u8],
783        __ctx: &TaskContext,
784    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
785        Ok(Arc::new(ArrowFormatFactory::new()))
786    }
787
788    fn try_encode_file_format(
789        &self,
790        __buf: &mut Vec<u8>,
791        __node: Arc<dyn FileFormatFactory>,
792    ) -> datafusion_common::Result<()> {
793        Ok(())
794    }
795}