Skip to main content

datafusion_proto/logical_plan/
file_formats.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use super::LogicalExtensionCodec;
21use crate::protobuf::{
22    CsvOptions as CsvOptionsProto, CsvQuoteStyle as CsvQuoteStyleProto,
23    JsonOptions as JsonOptionsProto,
24};
25use datafusion_common::config::{CsvOptions, JsonOptions};
26use datafusion_common::{
27    TableReference, exec_datafusion_err, exec_err, not_impl_err,
28    parsers::{CompressionTypeVariant, CsvQuoteStyle},
29};
30use datafusion_datasource::file_format::FileFormatFactory;
31use datafusion_datasource_arrow::file_format::ArrowFormatFactory;
32use datafusion_datasource_csv::file_format::CsvFormatFactory;
33use datafusion_datasource_json::file_format::JsonFormatFactory;
34use datafusion_execution::TaskContext;
35use prost::Message;
36
37#[derive(Debug)]
38pub struct CsvLogicalExtensionCodec;
39
40impl CsvOptionsProto {
41    fn from_factory(factory: &CsvFormatFactory) -> Self {
42        if let Some(options) = &factory.options {
43            CsvOptionsProto {
44                has_header: options.has_header.map_or(vec![], |v| vec![v as u8]),
45                delimiter: vec![options.delimiter],
46                quote: vec![options.quote],
47                terminator: options.terminator.map_or(vec![], |v| vec![v]),
48                escape: options.escape.map_or(vec![], |v| vec![v]),
49                double_quote: options.double_quote.map_or(vec![], |v| vec![v as u8]),
50                compression: options.compression as i32,
51                schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
52                date_format: options.date_format.clone().unwrap_or_default(),
53                datetime_format: options.datetime_format.clone().unwrap_or_default(),
54                timestamp_format: options.timestamp_format.clone().unwrap_or_default(),
55                timestamp_tz_format: options
56                    .timestamp_tz_format
57                    .clone()
58                    .unwrap_or_default(),
59                time_format: options.time_format.clone().unwrap_or_default(),
60                null_value: options.null_value.clone().unwrap_or_default(),
61                null_regex: options.null_regex.clone().unwrap_or_default(),
62                comment: options.comment.map_or(vec![], |v| vec![v]),
63                newlines_in_values: options
64                    .newlines_in_values
65                    .map_or(vec![], |v| vec![v as u8]),
66                truncated_rows: options.truncated_rows.map_or(vec![], |v| vec![v as u8]),
67                compression_level: options.compression_level,
68                quote_style: options.quote_style as i32,
69                ignore_leading_whitespace: options
70                    .ignore_leading_whitespace
71                    .map_or(vec![], |v| vec![v as u8]),
72                ignore_trailing_whitespace: options
73                    .ignore_trailing_whitespace
74                    .map_or(vec![], |v| vec![v as u8]),
75            }
76        } else {
77            CsvOptionsProto::default()
78        }
79    }
80}
81
82impl From<&CsvOptionsProto> for CsvOptions {
83    fn from(proto: &CsvOptionsProto) -> Self {
84        CsvOptions {
85            has_header: if !proto.has_header.is_empty() {
86                Some(proto.has_header[0] != 0)
87            } else {
88                None
89            },
90            delimiter: proto.delimiter.first().copied().unwrap_or(b','),
91            quote: proto.quote.first().copied().unwrap_or(b'"'),
92            terminator: if !proto.terminator.is_empty() {
93                Some(proto.terminator[0])
94            } else {
95                None
96            },
97            escape: if !proto.escape.is_empty() {
98                Some(proto.escape[0])
99            } else {
100                None
101            },
102            double_quote: if !proto.double_quote.is_empty() {
103                Some(proto.double_quote[0] != 0)
104            } else {
105                None
106            },
107            compression: match proto.compression {
108                0 => CompressionTypeVariant::GZIP,
109                1 => CompressionTypeVariant::BZIP2,
110                2 => CompressionTypeVariant::XZ,
111                3 => CompressionTypeVariant::ZSTD,
112                _ => CompressionTypeVariant::UNCOMPRESSED,
113            },
114            schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
115            date_format: if proto.date_format.is_empty() {
116                None
117            } else {
118                Some(proto.date_format.clone())
119            },
120            datetime_format: if proto.datetime_format.is_empty() {
121                None
122            } else {
123                Some(proto.datetime_format.clone())
124            },
125            timestamp_format: if proto.timestamp_format.is_empty() {
126                None
127            } else {
128                Some(proto.timestamp_format.clone())
129            },
130            timestamp_tz_format: if proto.timestamp_tz_format.is_empty() {
131                None
132            } else {
133                Some(proto.timestamp_tz_format.clone())
134            },
135            time_format: if proto.time_format.is_empty() {
136                None
137            } else {
138                Some(proto.time_format.clone())
139            },
140            null_value: if proto.null_value.is_empty() {
141                None
142            } else {
143                Some(proto.null_value.clone())
144            },
145            null_regex: if proto.null_regex.is_empty() {
146                None
147            } else {
148                Some(proto.null_regex.clone())
149            },
150            comment: if !proto.comment.is_empty() {
151                Some(proto.comment[0])
152            } else {
153                None
154            },
155            newlines_in_values: if proto.newlines_in_values.is_empty() {
156                None
157            } else {
158                Some(proto.newlines_in_values[0] != 0)
159            },
160            truncated_rows: if proto.truncated_rows.is_empty() {
161                None
162            } else {
163                Some(proto.truncated_rows[0] != 0)
164            },
165            compression_level: proto.compression_level,
166            quote_style: match CsvQuoteStyleProto::try_from(proto.quote_style) {
167                Ok(CsvQuoteStyleProto::Always) => CsvQuoteStyle::Always,
168                Ok(CsvQuoteStyleProto::NonNumeric) => CsvQuoteStyle::NonNumeric,
169                Ok(CsvQuoteStyleProto::Never) => CsvQuoteStyle::Never,
170                Ok(CsvQuoteStyleProto::Necessary) => CsvQuoteStyle::Necessary,
171                _ => CsvQuoteStyle::Necessary,
172            },
173            ignore_leading_whitespace: if proto.ignore_leading_whitespace.is_empty() {
174                None
175            } else {
176                Some(proto.ignore_leading_whitespace[0] != 0)
177            },
178            ignore_trailing_whitespace: if proto.ignore_trailing_whitespace.is_empty() {
179                None
180            } else {
181                Some(proto.ignore_trailing_whitespace[0] != 0)
182            },
183        }
184    }
185}
186
187// TODO! This is a placeholder for now and needs to be implemented for real.
188impl LogicalExtensionCodec for CsvLogicalExtensionCodec {
189    fn try_decode(
190        &self,
191        _buf: &[u8],
192        _inputs: &[datafusion_expr::LogicalPlan],
193        _ctx: &TaskContext,
194    ) -> datafusion_common::Result<datafusion_expr::Extension> {
195        not_impl_err!("Method not implemented")
196    }
197
198    fn try_encode(
199        &self,
200        _node: &datafusion_expr::Extension,
201        _buf: &mut Vec<u8>,
202    ) -> datafusion_common::Result<()> {
203        not_impl_err!("Method not implemented")
204    }
205
206    fn try_decode_table_provider(
207        &self,
208        _buf: &[u8],
209        _table_ref: &TableReference,
210        _schema: arrow::datatypes::SchemaRef,
211        _ctx: &TaskContext,
212    ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>> {
213        not_impl_err!("Method not implemented")
214    }
215
216    fn try_encode_table_provider(
217        &self,
218        _table_ref: &TableReference,
219        _node: Arc<dyn datafusion_catalog::TableProvider>,
220        _buf: &mut Vec<u8>,
221    ) -> datafusion_common::Result<()> {
222        not_impl_err!("Method not implemented")
223    }
224
225    fn try_decode_file_format(
226        &self,
227        buf: &[u8],
228        _ctx: &TaskContext,
229    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
230        let proto = CsvOptionsProto::decode(buf).map_err(|e| {
231            exec_datafusion_err!("Failed to decode CsvOptionsProto: {e:?}")
232        })?;
233        let options: CsvOptions = (&proto).into();
234        Ok(Arc::new(CsvFormatFactory {
235            options: Some(options),
236        }))
237    }
238
239    fn try_encode_file_format(
240        &self,
241        buf: &mut Vec<u8>,
242        node: Arc<dyn FileFormatFactory>,
243    ) -> datafusion_common::Result<()> {
244        let options = if let Some(csv_factory) = node.downcast_ref::<CsvFormatFactory>() {
245            csv_factory.options.clone().unwrap_or_default()
246        } else {
247            return exec_err!("{}", "Unsupported FileFormatFactory type".to_string());
248        };
249
250        let proto = CsvOptionsProto::from_factory(&CsvFormatFactory {
251            options: Some(options),
252        });
253
254        proto
255            .encode(buf)
256            .map_err(|e| exec_datafusion_err!("Failed to encode CsvOptions: {e:?}"))?;
257
258        Ok(())
259    }
260}
261
262impl JsonOptionsProto {
263    fn from_factory(factory: &JsonFormatFactory) -> Self {
264        if let Some(options) = &factory.options {
265            JsonOptionsProto {
266                compression: options.compression as i32,
267                schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
268                compression_level: options.compression_level,
269                newline_delimited: Some(options.newline_delimited),
270            }
271        } else {
272            JsonOptionsProto::default()
273        }
274    }
275}
276
277impl From<&JsonOptionsProto> for JsonOptions {
278    fn from(proto: &JsonOptionsProto) -> Self {
279        JsonOptions {
280            compression: match proto.compression {
281                0 => CompressionTypeVariant::GZIP,
282                1 => CompressionTypeVariant::BZIP2,
283                2 => CompressionTypeVariant::XZ,
284                3 => CompressionTypeVariant::ZSTD,
285                _ => CompressionTypeVariant::UNCOMPRESSED,
286            },
287            schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
288            compression_level: proto.compression_level,
289            newline_delimited: proto.newline_delimited.unwrap_or(true),
290        }
291    }
292}
293
294#[derive(Debug)]
295pub struct JsonLogicalExtensionCodec;
296
297// TODO! This is a placeholder for now and needs to be implemented for real.
298impl LogicalExtensionCodec for JsonLogicalExtensionCodec {
299    fn try_decode(
300        &self,
301        _buf: &[u8],
302        _inputs: &[datafusion_expr::LogicalPlan],
303        _ctx: &TaskContext,
304    ) -> datafusion_common::Result<datafusion_expr::Extension> {
305        not_impl_err!("Method not implemented")
306    }
307
308    fn try_encode(
309        &self,
310        _node: &datafusion_expr::Extension,
311        _buf: &mut Vec<u8>,
312    ) -> datafusion_common::Result<()> {
313        not_impl_err!("Method not implemented")
314    }
315
316    fn try_decode_table_provider(
317        &self,
318        _buf: &[u8],
319        _table_ref: &TableReference,
320        _schema: arrow::datatypes::SchemaRef,
321        _ctx: &TaskContext,
322    ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>> {
323        not_impl_err!("Method not implemented")
324    }
325
326    fn try_encode_table_provider(
327        &self,
328        _table_ref: &TableReference,
329        _node: Arc<dyn datafusion_catalog::TableProvider>,
330        _buf: &mut Vec<u8>,
331    ) -> datafusion_common::Result<()> {
332        not_impl_err!("Method not implemented")
333    }
334
335    fn try_decode_file_format(
336        &self,
337        buf: &[u8],
338        _ctx: &TaskContext,
339    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
340        let proto = JsonOptionsProto::decode(buf).map_err(|e| {
341            exec_datafusion_err!("Failed to decode JsonOptionsProto: {e:?}")
342        })?;
343        let options: JsonOptions = (&proto).into();
344        Ok(Arc::new(JsonFormatFactory {
345            options: Some(options),
346        }))
347    }
348
349    fn try_encode_file_format(
350        &self,
351        buf: &mut Vec<u8>,
352        node: Arc<dyn FileFormatFactory>,
353    ) -> datafusion_common::Result<()> {
354        let options = if let Some(json_factory) = node.downcast_ref::<JsonFormatFactory>()
355        {
356            json_factory.options.clone().unwrap_or_default()
357        } else {
358            return exec_err!("Unsupported FileFormatFactory type");
359        };
360
361        let proto = JsonOptionsProto::from_factory(&JsonFormatFactory {
362            options: Some(options),
363        });
364
365        proto
366            .encode(buf)
367            .map_err(|e| exec_datafusion_err!("Failed to encode JsonOptions: {e:?}"))?;
368
369        Ok(())
370    }
371}
372
373#[cfg(feature = "parquet")]
374mod parquet {
375    use super::*;
376
377    use crate::protobuf::{
378        ParquetCdcOptions as ParquetCdcOptionsProto,
379        ParquetColumnOptions as ParquetColumnOptionsProto, ParquetColumnSpecificOptions,
380        ParquetOptions as ParquetOptionsProto,
381        TableParquetOptions as TableParquetOptionsProto, parquet_column_options,
382        parquet_options,
383    };
384    use datafusion_common::config::{
385        ParquetCdcOptions, ParquetColumnOptions, ParquetOptions, TableParquetOptions,
386    };
387    use datafusion_datasource_parquet::file_format::ParquetFormatFactory;
388
389    impl TableParquetOptionsProto {
390        fn from_factory(factory: &ParquetFormatFactory) -> Self {
391            let global_options = if let Some(ref options) = factory.options {
392                options.clone()
393            } else {
394                return TableParquetOptionsProto::default();
395            };
396
397            let column_specific_options = global_options.column_specific_options;
398            TableParquetOptionsProto {
399            global: Some(ParquetOptionsProto {
400                enable_page_index: global_options.global.enable_page_index,
401                pruning: global_options.global.pruning,
402                skip_metadata: global_options.global.skip_metadata,
403                metadata_size_hint_opt: global_options.global.metadata_size_hint.map(|size| {
404                    parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size as u64)
405                }),
406                pushdown_filters: global_options.global.pushdown_filters,
407                reorder_filters: global_options.global.reorder_filters,
408                force_filter_selections: global_options.global.force_filter_selections,
409                data_pagesize_limit: global_options.global.data_pagesize_limit as u64,
410                write_batch_size: global_options.global.write_batch_size as u64,
411                writer_version: global_options.global.writer_version.to_string(),
412                compression_opt: global_options.global.compression.map(|compression| {
413                    parquet_options::CompressionOpt::Compression(compression)
414                }),
415                dictionary_enabled_opt: global_options.global.dictionary_enabled.map(|enabled| {
416                    parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
417                }),
418                dictionary_page_size_limit: global_options.global.dictionary_page_size_limit as u64,
419                statistics_enabled_opt: global_options.global.statistics_enabled.map(|enabled| {
420                    parquet_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
421                }),
422                max_row_group_size: global_options.global.max_row_group_size as u64,
423                created_by: global_options.global.created_by.clone(),
424                column_index_truncate_length_opt: global_options.global.column_index_truncate_length.map(|length| {
425                    parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length as u64)
426                }),
427                statistics_truncate_length_opt: global_options.global.statistics_truncate_length.map(|length| {
428                    parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length as u64)
429                }),
430                data_page_row_count_limit: global_options.global.data_page_row_count_limit as u64,
431                encoding_opt: global_options.global.encoding.map(|encoding| {
432                    parquet_options::EncodingOpt::Encoding(encoding)
433                }),
434                bloom_filter_on_read: global_options.global.bloom_filter_on_read,
435                bloom_filter_on_write: global_options.global.bloom_filter_on_write,
436                bloom_filter_fpp_opt: global_options.global.bloom_filter_fpp.map(|fpp| {
437                    parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
438                }),
439                bloom_filter_ndv_opt: global_options.global.bloom_filter_ndv.map(|ndv| {
440                    parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
441                }),
442                allow_single_file_parallelism: global_options.global.allow_single_file_parallelism,
443                maximum_parallel_row_group_writers: global_options.global.maximum_parallel_row_group_writers as u64,
444                maximum_buffered_record_batches_per_stream: global_options.global.maximum_buffered_record_batches_per_stream as u64,
445                schema_force_view_types: global_options.global.schema_force_view_types,
446                binary_as_string: global_options.global.binary_as_string,
447                skip_arrow_metadata: global_options.global.skip_arrow_metadata,
448                coerce_int96_opt: global_options.global.coerce_int96.map(|compression| {
449                    parquet_options::CoerceInt96Opt::CoerceInt96(compression)
450                }),
451                coerce_int96_tz_opt: global_options.global.coerce_int96_tz.map(|tz| {
452                    parquet_options::CoerceInt96TzOpt::CoerceInt96Tz(tz)
453                }),
454                max_predicate_cache_size_opt: global_options.global.max_predicate_cache_size.map(|size| {
455                    parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size as u64)
456                }),
457                content_defined_chunking: Some(ParquetCdcOptionsProto {
458                    enabled: global_options.global.content_defined_chunking.enabled,
459                    min_chunk_size: global_options.global.content_defined_chunking.min_chunk_size as u64,
460                    max_chunk_size: global_options.global.content_defined_chunking.max_chunk_size as u64,
461                    norm_level: global_options.global.content_defined_chunking.norm_level,
462                }),
463            }),
464            column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| {
465                ParquetColumnSpecificOptions {
466                    column_name,
467                    options: Some(ParquetColumnOptionsProto {
468                        bloom_filter_enabled_opt: options.bloom_filter_enabled.map(|enabled| {
469                            parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(enabled)
470                        }),
471                        encoding_opt: options.encoding.map(|encoding| {
472                            parquet_column_options::EncodingOpt::Encoding(encoding)
473                        }),
474                        dictionary_enabled_opt: options.dictionary_enabled.map(|enabled| {
475                            parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
476                        }),
477                        compression_opt: options.compression.map(|compression| {
478                            parquet_column_options::CompressionOpt::Compression(compression)
479                        }),
480                        statistics_enabled_opt: options.statistics_enabled.map(|enabled| {
481                            parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
482                        }),
483                        bloom_filter_fpp_opt: options.bloom_filter_fpp.map(|fpp| {
484                            parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
485                        }),
486                        bloom_filter_ndv_opt: options.bloom_filter_ndv.map(|ndv| {
487                            parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
488                        }),
489                    })
490                }
491            }).collect(),
492            key_value_metadata: global_options.key_value_metadata
493                .iter()
494                .filter_map(|(key, value)| {
495                    value.as_ref().map(|v| (key.clone(), v.clone()))
496                })
497                .collect(),
498        }
499        }
500    }
501
502    impl From<&ParquetOptionsProto> for ParquetOptions {
503        fn from(proto: &ParquetOptionsProto) -> Self {
504            ParquetOptions {
505            enable_page_index: proto.enable_page_index,
506            pruning: proto.pruning,
507            skip_metadata: proto.skip_metadata,
508            metadata_size_hint: proto.metadata_size_hint_opt.as_ref().map(|opt| match opt {
509                parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size) => *size as usize,
510            }),
511            pushdown_filters: proto.pushdown_filters,
512            reorder_filters: proto.reorder_filters,
513            force_filter_selections: proto.force_filter_selections,
514            data_pagesize_limit: proto.data_pagesize_limit as usize,
515            write_batch_size: proto.write_batch_size as usize,
516                   // TODO: Consider changing to TryFrom to avoid panic on invalid proto data
517            writer_version: proto.writer_version.parse().expect("
518                Invalid parquet writer version in proto, expected '1.0' or '2.0'
519            "),
520            compression: proto.compression_opt.as_ref().map(|opt| match opt {
521                parquet_options::CompressionOpt::Compression(compression) => compression.clone(),
522            }),
523            dictionary_enabled: proto.dictionary_enabled_opt.as_ref().map(|opt| match opt {
524                parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled) => *enabled,
525            }),
526            dictionary_page_size_limit: proto.dictionary_page_size_limit as usize,
527            statistics_enabled: proto.statistics_enabled_opt.as_ref().map(|opt| match opt {
528                parquet_options::StatisticsEnabledOpt::StatisticsEnabled(statistics) => statistics.clone(),
529            }),
530            max_row_group_size: proto.max_row_group_size as usize,
531            created_by: proto.created_by.clone(),
532            column_index_truncate_length: proto.column_index_truncate_length_opt.as_ref().map(|opt| match opt {
533                parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length) => *length as usize,
534            }),
535            statistics_truncate_length: proto.statistics_truncate_length_opt.as_ref().map(|opt| match opt {
536                parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length) => *length as usize,
537            }),
538            data_page_row_count_limit: proto.data_page_row_count_limit as usize,
539            encoding: proto.encoding_opt.as_ref().map(|opt| match opt {
540                parquet_options::EncodingOpt::Encoding(encoding) => encoding.clone(),
541            }),
542            bloom_filter_on_read: proto.bloom_filter_on_read,
543            bloom_filter_on_write: proto.bloom_filter_on_write,
544            bloom_filter_fpp: proto.bloom_filter_fpp_opt.as_ref().map(|opt| match opt {
545                parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp) => *fpp,
546            }),
547            bloom_filter_ndv: proto.bloom_filter_ndv_opt.as_ref().map(|opt| match opt {
548                parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv) => *ndv,
549            }),
550            allow_single_file_parallelism: proto.allow_single_file_parallelism,
551            maximum_parallel_row_group_writers: proto.maximum_parallel_row_group_writers as usize,
552            maximum_buffered_record_batches_per_stream: proto.maximum_buffered_record_batches_per_stream as usize,
553            schema_force_view_types: proto.schema_force_view_types,
554            binary_as_string: proto.binary_as_string,
555            skip_arrow_metadata: proto.skip_arrow_metadata,
556            coerce_int96: proto.coerce_int96_opt.as_ref().map(|opt| match opt {
557                parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => coerce_int96.clone(),
558            }),
559            coerce_int96_tz: proto.coerce_int96_tz_opt.as_ref().map(|opt| match opt {
560                parquet_options::CoerceInt96TzOpt::CoerceInt96Tz(tz) => tz.clone(),
561            }),
562            max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt {
563                parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize,
564            }),
565            content_defined_chunking: proto.content_defined_chunking.map(|cdc| ParquetCdcOptions {
566                enabled: cdc.enabled,
567                min_chunk_size: cdc.min_chunk_size as usize,
568                max_chunk_size: cdc.max_chunk_size as usize,
569                norm_level: cdc.norm_level,
570            }).unwrap_or_default(),
571        }
572        }
573    }
574
575    impl From<ParquetColumnOptionsProto> for ParquetColumnOptions {
576        fn from(proto: ParquetColumnOptionsProto) -> Self {
577            ParquetColumnOptions {
578            bloom_filter_enabled: proto.bloom_filter_enabled_opt.map(
579                |parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v)| v,
580            ),
581            encoding: proto
582                .encoding_opt
583                .map(|parquet_column_options::EncodingOpt::Encoding(v)| v),
584            dictionary_enabled: proto.dictionary_enabled_opt.map(
585                |parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(v)| v,
586            ),
587            compression: proto
588                .compression_opt
589                .map(|parquet_column_options::CompressionOpt::Compression(v)| v),
590            statistics_enabled: proto.statistics_enabled_opt.map(
591                |parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(v)| v,
592            ),
593            bloom_filter_fpp: proto
594                .bloom_filter_fpp_opt
595                .map(|parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v)| v),
596            bloom_filter_ndv: proto
597                .bloom_filter_ndv_opt
598                .map(|parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v)| v),
599        }
600        }
601    }
602
603    impl From<&TableParquetOptionsProto> for TableParquetOptions {
604        fn from(proto: &TableParquetOptionsProto) -> Self {
605            TableParquetOptions {
606                global: proto
607                    .global
608                    .as_ref()
609                    .map(ParquetOptions::from)
610                    .unwrap_or_default(),
611                column_specific_options: proto
612                    .column_specific_options
613                    .iter()
614                    .map(|parquet_column_options| {
615                        (
616                            parquet_column_options.column_name.clone(),
617                            ParquetColumnOptions::from(
618                                parquet_column_options
619                                    .options
620                                    .clone()
621                                    .unwrap_or_default(),
622                            ),
623                        )
624                    })
625                    .collect(),
626                key_value_metadata: proto
627                    .key_value_metadata
628                    .iter()
629                    .map(|(k, v)| (k.clone(), Some(v.clone())))
630                    .collect(),
631                ..Default::default()
632            }
633        }
634    }
635
636    #[derive(Debug)]
637    pub struct ParquetLogicalExtensionCodec;
638
639    // TODO! This is a placeholder for now and needs to be implemented for real.
640    impl LogicalExtensionCodec for ParquetLogicalExtensionCodec {
641        fn try_decode(
642            &self,
643            _buf: &[u8],
644            _inputs: &[datafusion_expr::LogicalPlan],
645            _ctx: &TaskContext,
646        ) -> datafusion_common::Result<datafusion_expr::Extension> {
647            not_impl_err!("Method not implemented")
648        }
649
650        fn try_encode(
651            &self,
652            _node: &datafusion_expr::Extension,
653            _buf: &mut Vec<u8>,
654        ) -> datafusion_common::Result<()> {
655            not_impl_err!("Method not implemented")
656        }
657
658        fn try_decode_table_provider(
659            &self,
660            _buf: &[u8],
661            _table_ref: &TableReference,
662            _schema: arrow::datatypes::SchemaRef,
663            _ctx: &TaskContext,
664        ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>>
665        {
666            not_impl_err!("Method not implemented")
667        }
668
669        fn try_encode_table_provider(
670            &self,
671            _table_ref: &TableReference,
672            _node: Arc<dyn datafusion_catalog::TableProvider>,
673            _buf: &mut Vec<u8>,
674        ) -> datafusion_common::Result<()> {
675            not_impl_err!("Method not implemented")
676        }
677
678        fn try_decode_file_format(
679            &self,
680            buf: &[u8],
681            _ctx: &TaskContext,
682        ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
683            let proto = TableParquetOptionsProto::decode(buf).map_err(|e| {
684                exec_datafusion_err!("Failed to decode TableParquetOptionsProto: {e:?}")
685            })?;
686            let options: TableParquetOptions = (&proto).into();
687            Ok(Arc::new(
688                datafusion_datasource_parquet::file_format::ParquetFormatFactory {
689                    options: Some(options),
690                },
691            ))
692        }
693
694        fn try_encode_file_format(
695            &self,
696            buf: &mut Vec<u8>,
697            node: Arc<dyn FileFormatFactory>,
698        ) -> datafusion_common::Result<()> {
699            use datafusion_datasource_parquet::file_format::ParquetFormatFactory;
700
701            let options = if let Some(parquet_factory) =
702                node.downcast_ref::<ParquetFormatFactory>()
703            {
704                parquet_factory.options.clone().unwrap_or_default()
705            } else {
706                return exec_err!("Unsupported FileFormatFactory type");
707            };
708
709            let proto = TableParquetOptionsProto::from_factory(&ParquetFormatFactory {
710                options: Some(options),
711            });
712
713            proto.encode(buf).map_err(|e| {
714                exec_datafusion_err!("Failed to encode TableParquetOptionsProto: {e:?}")
715            })?;
716
717            Ok(())
718        }
719    }
720}
721#[cfg(feature = "parquet")]
722pub use parquet::ParquetLogicalExtensionCodec;
723
724#[derive(Debug)]
725pub struct ArrowLogicalExtensionCodec;
726
727// TODO! This is a placeholder for now and needs to be implemented for real.
728impl LogicalExtensionCodec for ArrowLogicalExtensionCodec {
729    fn try_decode(
730        &self,
731        _buf: &[u8],
732        _inputs: &[datafusion_expr::LogicalPlan],
733        _ctx: &TaskContext,
734    ) -> datafusion_common::Result<datafusion_expr::Extension> {
735        not_impl_err!("Method not implemented")
736    }
737
738    fn try_encode(
739        &self,
740        _node: &datafusion_expr::Extension,
741        _buf: &mut Vec<u8>,
742    ) -> datafusion_common::Result<()> {
743        not_impl_err!("Method not implemented")
744    }
745
746    fn try_decode_table_provider(
747        &self,
748        _buf: &[u8],
749        _table_ref: &TableReference,
750        _schema: arrow::datatypes::SchemaRef,
751        _ctx: &TaskContext,
752    ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>> {
753        not_impl_err!("Method not implemented")
754    }
755
756    fn try_encode_table_provider(
757        &self,
758        _table_ref: &TableReference,
759        _node: Arc<dyn datafusion_catalog::TableProvider>,
760        _buf: &mut Vec<u8>,
761    ) -> datafusion_common::Result<()> {
762        not_impl_err!("Method not implemented")
763    }
764
765    fn try_decode_file_format(
766        &self,
767        __buf: &[u8],
768        __ctx: &TaskContext,
769    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
770        Ok(Arc::new(ArrowFormatFactory::new()))
771    }
772
773    fn try_encode_file_format(
774        &self,
775        __buf: &mut Vec<u8>,
776        __node: Arc<dyn FileFormatFactory>,
777    ) -> datafusion_common::Result<()> {
778        Ok(())
779    }
780}
781
782#[derive(Debug)]
783pub struct AvroLogicalExtensionCodec;
784
785// TODO! This is a placeholder for now and needs to be implemented for real.
786impl LogicalExtensionCodec for AvroLogicalExtensionCodec {
787    fn try_decode(
788        &self,
789        _buf: &[u8],
790        _inputs: &[datafusion_expr::LogicalPlan],
791        _ctx: &TaskContext,
792    ) -> datafusion_common::Result<datafusion_expr::Extension> {
793        not_impl_err!("Method not implemented")
794    }
795
796    fn try_encode(
797        &self,
798        _node: &datafusion_expr::Extension,
799        _buf: &mut Vec<u8>,
800    ) -> datafusion_common::Result<()> {
801        not_impl_err!("Method not implemented")
802    }
803
804    fn try_decode_table_provider(
805        &self,
806        _buf: &[u8],
807        _table_ref: &TableReference,
808        _schema: arrow::datatypes::SchemaRef,
809        _cts: &TaskContext,
810    ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>> {
811        not_impl_err!("Method not implemented")
812    }
813
814    fn try_encode_table_provider(
815        &self,
816        _table_ref: &TableReference,
817        _node: Arc<dyn datafusion_catalog::TableProvider>,
818        _buf: &mut Vec<u8>,
819    ) -> datafusion_common::Result<()> {
820        not_impl_err!("Method not implemented")
821    }
822
823    fn try_decode_file_format(
824        &self,
825        __buf: &[u8],
826        __ctx: &TaskContext,
827    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
828        Ok(Arc::new(ArrowFormatFactory::new()))
829    }
830
831    fn try_encode_file_format(
832        &self,
833        __buf: &mut Vec<u8>,
834        __node: Arc<dyn FileFormatFactory>,
835    ) -> datafusion_common::Result<()> {
836        Ok(())
837    }
838}