datafusion_proto/logical_plan/
file_formats.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use datafusion::{
21    config::{
22        CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
23        TableParquetOptions,
24    },
25    datasource::file_format::{
26        arrow::ArrowFormatFactory, csv::CsvFormatFactory, json::JsonFormatFactory,
27        parquet::ParquetFormatFactory, FileFormatFactory,
28    },
29    prelude::SessionContext,
30};
31use datafusion_common::{
32    exec_err, not_impl_err, parsers::CompressionTypeVariant, DataFusionError,
33    TableReference,
34};
35use prost::Message;
36
37use crate::protobuf::{
38    parquet_column_options, parquet_options, CsvOptions as CsvOptionsProto,
39    JsonOptions as JsonOptionsProto, ParquetColumnOptions as ParquetColumnOptionsProto,
40    ParquetColumnSpecificOptions, ParquetOptions as ParquetOptionsProto,
41    TableParquetOptions as TableParquetOptionsProto,
42};
43
44use super::LogicalExtensionCodec;
45
46#[derive(Debug)]
47pub struct CsvLogicalExtensionCodec;
48
49impl CsvOptionsProto {
50    fn from_factory(factory: &CsvFormatFactory) -> Self {
51        if let Some(options) = &factory.options {
52            CsvOptionsProto {
53                has_header: options.has_header.map_or(vec![], |v| vec![v as u8]),
54                delimiter: vec![options.delimiter],
55                quote: vec![options.quote],
56                terminator: options.terminator.map_or(vec![], |v| vec![v]),
57                escape: options.escape.map_or(vec![], |v| vec![v]),
58                double_quote: options.double_quote.map_or(vec![], |v| vec![v as u8]),
59                compression: options.compression as i32,
60                schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
61                date_format: options.date_format.clone().unwrap_or_default(),
62                datetime_format: options.datetime_format.clone().unwrap_or_default(),
63                timestamp_format: options.timestamp_format.clone().unwrap_or_default(),
64                timestamp_tz_format: options
65                    .timestamp_tz_format
66                    .clone()
67                    .unwrap_or_default(),
68                time_format: options.time_format.clone().unwrap_or_default(),
69                null_value: options.null_value.clone().unwrap_or_default(),
70                null_regex: options.null_regex.clone().unwrap_or_default(),
71                comment: options.comment.map_or(vec![], |v| vec![v]),
72                newlines_in_values: options
73                    .newlines_in_values
74                    .map_or(vec![], |v| vec![v as u8]),
75            }
76        } else {
77            CsvOptionsProto::default()
78        }
79    }
80}
81
82impl From<&CsvOptionsProto> for CsvOptions {
83    fn from(proto: &CsvOptionsProto) -> Self {
84        CsvOptions {
85            has_header: if !proto.has_header.is_empty() {
86                Some(proto.has_header[0] != 0)
87            } else {
88                None
89            },
90            delimiter: proto.delimiter.first().copied().unwrap_or(b','),
91            quote: proto.quote.first().copied().unwrap_or(b'"'),
92            terminator: if !proto.terminator.is_empty() {
93                Some(proto.terminator[0])
94            } else {
95                None
96            },
97            escape: if !proto.escape.is_empty() {
98                Some(proto.escape[0])
99            } else {
100                None
101            },
102            double_quote: if !proto.double_quote.is_empty() {
103                Some(proto.double_quote[0] != 0)
104            } else {
105                None
106            },
107            compression: match proto.compression {
108                0 => CompressionTypeVariant::GZIP,
109                1 => CompressionTypeVariant::BZIP2,
110                2 => CompressionTypeVariant::XZ,
111                3 => CompressionTypeVariant::ZSTD,
112                _ => CompressionTypeVariant::UNCOMPRESSED,
113            },
114            schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
115            date_format: if proto.date_format.is_empty() {
116                None
117            } else {
118                Some(proto.date_format.clone())
119            },
120            datetime_format: if proto.datetime_format.is_empty() {
121                None
122            } else {
123                Some(proto.datetime_format.clone())
124            },
125            timestamp_format: if proto.timestamp_format.is_empty() {
126                None
127            } else {
128                Some(proto.timestamp_format.clone())
129            },
130            timestamp_tz_format: if proto.timestamp_tz_format.is_empty() {
131                None
132            } else {
133                Some(proto.timestamp_tz_format.clone())
134            },
135            time_format: if proto.time_format.is_empty() {
136                None
137            } else {
138                Some(proto.time_format.clone())
139            },
140            null_value: if proto.null_value.is_empty() {
141                None
142            } else {
143                Some(proto.null_value.clone())
144            },
145            null_regex: if proto.null_regex.is_empty() {
146                None
147            } else {
148                Some(proto.null_regex.clone())
149            },
150            comment: if !proto.comment.is_empty() {
151                Some(proto.comment[0])
152            } else {
153                None
154            },
155            newlines_in_values: if proto.newlines_in_values.is_empty() {
156                None
157            } else {
158                Some(proto.newlines_in_values[0] != 0)
159            },
160        }
161    }
162}
163
164// TODO! This is a placeholder for now and needs to be implemented for real.
165impl LogicalExtensionCodec for CsvLogicalExtensionCodec {
166    fn try_decode(
167        &self,
168        _buf: &[u8],
169        _inputs: &[datafusion_expr::LogicalPlan],
170        _ctx: &SessionContext,
171    ) -> datafusion_common::Result<datafusion_expr::Extension> {
172        not_impl_err!("Method not implemented")
173    }
174
175    fn try_encode(
176        &self,
177        _node: &datafusion_expr::Extension,
178        _buf: &mut Vec<u8>,
179    ) -> datafusion_common::Result<()> {
180        not_impl_err!("Method not implemented")
181    }
182
183    fn try_decode_table_provider(
184        &self,
185        _buf: &[u8],
186        _table_ref: &TableReference,
187        _schema: arrow::datatypes::SchemaRef,
188        _ctx: &SessionContext,
189    ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
190        not_impl_err!("Method not implemented")
191    }
192
193    fn try_encode_table_provider(
194        &self,
195        _table_ref: &TableReference,
196        _node: Arc<dyn datafusion::datasource::TableProvider>,
197        _buf: &mut Vec<u8>,
198    ) -> datafusion_common::Result<()> {
199        not_impl_err!("Method not implemented")
200    }
201
202    fn try_decode_file_format(
203        &self,
204        buf: &[u8],
205        _ctx: &SessionContext,
206    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
207        let proto = CsvOptionsProto::decode(buf).map_err(|e| {
208            DataFusionError::Execution(format!(
209                "Failed to decode CsvOptionsProto: {:?}",
210                e
211            ))
212        })?;
213        let options: CsvOptions = (&proto).into();
214        Ok(Arc::new(CsvFormatFactory {
215            options: Some(options),
216        }))
217    }
218
219    fn try_encode_file_format(
220        &self,
221        buf: &mut Vec<u8>,
222        node: Arc<dyn FileFormatFactory>,
223    ) -> datafusion_common::Result<()> {
224        let options =
225            if let Some(csv_factory) = node.as_any().downcast_ref::<CsvFormatFactory>() {
226                csv_factory.options.clone().unwrap_or_default()
227            } else {
228                return exec_err!("{}", "Unsupported FileFormatFactory type".to_string());
229            };
230
231        let proto = CsvOptionsProto::from_factory(&CsvFormatFactory {
232            options: Some(options),
233        });
234
235        proto.encode(buf).map_err(|e| {
236            DataFusionError::Execution(format!("Failed to encode CsvOptions: {:?}", e))
237        })?;
238
239        Ok(())
240    }
241}
242
243impl JsonOptionsProto {
244    fn from_factory(factory: &JsonFormatFactory) -> Self {
245        if let Some(options) = &factory.options {
246            JsonOptionsProto {
247                compression: options.compression as i32,
248                schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
249            }
250        } else {
251            JsonOptionsProto::default()
252        }
253    }
254}
255
256impl From<&JsonOptionsProto> for JsonOptions {
257    fn from(proto: &JsonOptionsProto) -> Self {
258        JsonOptions {
259            compression: match proto.compression {
260                0 => CompressionTypeVariant::GZIP,
261                1 => CompressionTypeVariant::BZIP2,
262                2 => CompressionTypeVariant::XZ,
263                3 => CompressionTypeVariant::ZSTD,
264                _ => CompressionTypeVariant::UNCOMPRESSED,
265            },
266            schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
267        }
268    }
269}
270
271#[derive(Debug)]
272pub struct JsonLogicalExtensionCodec;
273
274// TODO! This is a placeholder for now and needs to be implemented for real.
275impl LogicalExtensionCodec for JsonLogicalExtensionCodec {
276    fn try_decode(
277        &self,
278        _buf: &[u8],
279        _inputs: &[datafusion_expr::LogicalPlan],
280        _ctx: &SessionContext,
281    ) -> datafusion_common::Result<datafusion_expr::Extension> {
282        not_impl_err!("Method not implemented")
283    }
284
285    fn try_encode(
286        &self,
287        _node: &datafusion_expr::Extension,
288        _buf: &mut Vec<u8>,
289    ) -> datafusion_common::Result<()> {
290        not_impl_err!("Method not implemented")
291    }
292
293    fn try_decode_table_provider(
294        &self,
295        _buf: &[u8],
296        _table_ref: &TableReference,
297        _schema: arrow::datatypes::SchemaRef,
298        _ctx: &SessionContext,
299    ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
300        not_impl_err!("Method not implemented")
301    }
302
303    fn try_encode_table_provider(
304        &self,
305        _table_ref: &TableReference,
306        _node: Arc<dyn datafusion::datasource::TableProvider>,
307        _buf: &mut Vec<u8>,
308    ) -> datafusion_common::Result<()> {
309        not_impl_err!("Method not implemented")
310    }
311
312    fn try_decode_file_format(
313        &self,
314        buf: &[u8],
315        _ctx: &SessionContext,
316    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
317        let proto = JsonOptionsProto::decode(buf).map_err(|e| {
318            DataFusionError::Execution(format!(
319                "Failed to decode JsonOptionsProto: {:?}",
320                e
321            ))
322        })?;
323        let options: JsonOptions = (&proto).into();
324        Ok(Arc::new(JsonFormatFactory {
325            options: Some(options),
326        }))
327    }
328
329    fn try_encode_file_format(
330        &self,
331        buf: &mut Vec<u8>,
332        node: Arc<dyn FileFormatFactory>,
333    ) -> datafusion_common::Result<()> {
334        let options = if let Some(json_factory) =
335            node.as_any().downcast_ref::<JsonFormatFactory>()
336        {
337            json_factory.options.clone().unwrap_or_default()
338        } else {
339            return Err(DataFusionError::Execution(
340                "Unsupported FileFormatFactory type".to_string(),
341            ));
342        };
343
344        let proto = JsonOptionsProto::from_factory(&JsonFormatFactory {
345            options: Some(options),
346        });
347
348        proto.encode(buf).map_err(|e| {
349            DataFusionError::Execution(format!("Failed to encode JsonOptions: {:?}", e))
350        })?;
351
352        Ok(())
353    }
354}
355
356impl TableParquetOptionsProto {
357    fn from_factory(factory: &ParquetFormatFactory) -> Self {
358        let global_options = if let Some(ref options) = factory.options {
359            options.clone()
360        } else {
361            return TableParquetOptionsProto::default();
362        };
363
364        let column_specific_options = global_options.column_specific_options;
365        #[allow(deprecated)] // max_statistics_size
366        TableParquetOptionsProto {
367            global: Some(ParquetOptionsProto {
368                enable_page_index: global_options.global.enable_page_index,
369                pruning: global_options.global.pruning,
370                skip_metadata: global_options.global.skip_metadata,
371                metadata_size_hint_opt: global_options.global.metadata_size_hint.map(|size| {
372                    parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size as u64)
373                }),
374                pushdown_filters: global_options.global.pushdown_filters,
375                reorder_filters: global_options.global.reorder_filters,
376                data_pagesize_limit: global_options.global.data_pagesize_limit as u64,
377                write_batch_size: global_options.global.write_batch_size as u64,
378                writer_version: global_options.global.writer_version.clone(),
379                compression_opt: global_options.global.compression.map(|compression| {
380                    parquet_options::CompressionOpt::Compression(compression)
381                }),
382                dictionary_enabled_opt: global_options.global.dictionary_enabled.map(|enabled| {
383                    parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
384                }),
385                dictionary_page_size_limit: global_options.global.dictionary_page_size_limit as u64,
386                statistics_enabled_opt: global_options.global.statistics_enabled.map(|enabled| {
387                    parquet_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
388                }),
389                max_statistics_size_opt: global_options.global.max_statistics_size.map(|size| {
390                    parquet_options::MaxStatisticsSizeOpt::MaxStatisticsSize(size as u64)
391                }),
392                max_row_group_size: global_options.global.max_row_group_size as u64,
393                created_by: global_options.global.created_by.clone(),
394                column_index_truncate_length_opt: global_options.global.column_index_truncate_length.map(|length| {
395                    parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length as u64)
396                }),
397                statistics_truncate_length_opt: global_options.global.statistics_truncate_length.map(|length| {
398                    parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length as u64)
399                }),
400                data_page_row_count_limit: global_options.global.data_page_row_count_limit as u64,
401                encoding_opt: global_options.global.encoding.map(|encoding| {
402                    parquet_options::EncodingOpt::Encoding(encoding)
403                }),
404                bloom_filter_on_read: global_options.global.bloom_filter_on_read,
405                bloom_filter_on_write: global_options.global.bloom_filter_on_write,
406                bloom_filter_fpp_opt: global_options.global.bloom_filter_fpp.map(|fpp| {
407                    parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
408                }),
409                bloom_filter_ndv_opt: global_options.global.bloom_filter_ndv.map(|ndv| {
410                    parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
411                }),
412                allow_single_file_parallelism: global_options.global.allow_single_file_parallelism,
413                maximum_parallel_row_group_writers: global_options.global.maximum_parallel_row_group_writers as u64,
414                maximum_buffered_record_batches_per_stream: global_options.global.maximum_buffered_record_batches_per_stream as u64,
415                schema_force_view_types: global_options.global.schema_force_view_types,
416                binary_as_string: global_options.global.binary_as_string,
417                skip_arrow_metadata: global_options.global.skip_arrow_metadata,
418                coerce_int96_opt: global_options.global.coerce_int96.map(|compression| {
419                    parquet_options::CoerceInt96Opt::CoerceInt96(compression)
420                }),
421            }),
422            column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| {
423                ParquetColumnSpecificOptions {
424                    column_name,
425                    options: Some(ParquetColumnOptionsProto {
426                        bloom_filter_enabled_opt: options.bloom_filter_enabled.map(|enabled| {
427                            parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(enabled)
428                        }),
429                        encoding_opt: options.encoding.map(|encoding| {
430                            parquet_column_options::EncodingOpt::Encoding(encoding)
431                        }),
432                        dictionary_enabled_opt: options.dictionary_enabled.map(|enabled| {
433                            parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
434                        }),
435                        compression_opt: options.compression.map(|compression| {
436                            parquet_column_options::CompressionOpt::Compression(compression)
437                        }),
438                        statistics_enabled_opt: options.statistics_enabled.map(|enabled| {
439                            parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
440                        }),
441                        bloom_filter_fpp_opt: options.bloom_filter_fpp.map(|fpp| {
442                            parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
443                        }),
444                        bloom_filter_ndv_opt: options.bloom_filter_ndv.map(|ndv| {
445                            parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
446                        }),
447                        max_statistics_size_opt: options.max_statistics_size.map(|size| {
448                            parquet_column_options::MaxStatisticsSizeOpt::MaxStatisticsSize(size as u32)
449                        }),
450                    })
451                }
452            }).collect(),
453            key_value_metadata: global_options.key_value_metadata
454                .iter()
455                .filter_map(|(key, value)| {
456                    value.as_ref().map(|v| (key.clone(), v.clone()))
457                })
458                .collect(),
459        }
460    }
461}
462
463impl From<&ParquetOptionsProto> for ParquetOptions {
464    fn from(proto: &ParquetOptionsProto) -> Self {
465        #[allow(deprecated)] // max_statistics_size
466        ParquetOptions {
467            enable_page_index: proto.enable_page_index,
468            pruning: proto.pruning,
469            skip_metadata: proto.skip_metadata,
470            metadata_size_hint: proto.metadata_size_hint_opt.as_ref().map(|opt| match opt {
471                parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size) => *size as usize,
472            }),
473            pushdown_filters: proto.pushdown_filters,
474            reorder_filters: proto.reorder_filters,
475            data_pagesize_limit: proto.data_pagesize_limit as usize,
476            write_batch_size: proto.write_batch_size as usize,
477            writer_version: proto.writer_version.clone(),
478            compression: proto.compression_opt.as_ref().map(|opt| match opt {
479                parquet_options::CompressionOpt::Compression(compression) => compression.clone(),
480            }),
481            dictionary_enabled: proto.dictionary_enabled_opt.as_ref().map(|opt| match opt {
482                parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled) => *enabled,
483            }),
484            dictionary_page_size_limit: proto.dictionary_page_size_limit as usize,
485            statistics_enabled: proto.statistics_enabled_opt.as_ref().map(|opt| match opt {
486                parquet_options::StatisticsEnabledOpt::StatisticsEnabled(statistics) => statistics.clone(),
487            }),
488            max_statistics_size: proto.max_statistics_size_opt.as_ref().map(|opt| match opt {
489                parquet_options::MaxStatisticsSizeOpt::MaxStatisticsSize(size) => *size as usize,
490            }),
491            max_row_group_size: proto.max_row_group_size as usize,
492            created_by: proto.created_by.clone(),
493            column_index_truncate_length: proto.column_index_truncate_length_opt.as_ref().map(|opt| match opt {
494                parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length) => *length as usize,
495            }),
496            statistics_truncate_length: proto.statistics_truncate_length_opt.as_ref().map(|opt| match opt {
497                parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length) => *length as usize,
498            }),
499            data_page_row_count_limit: proto.data_page_row_count_limit as usize,
500            encoding: proto.encoding_opt.as_ref().map(|opt| match opt {
501                parquet_options::EncodingOpt::Encoding(encoding) => encoding.clone(),
502            }),
503            bloom_filter_on_read: proto.bloom_filter_on_read,
504            bloom_filter_on_write: proto.bloom_filter_on_write,
505            bloom_filter_fpp: proto.bloom_filter_fpp_opt.as_ref().map(|opt| match opt {
506                parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp) => *fpp,
507            }),
508            bloom_filter_ndv: proto.bloom_filter_ndv_opt.as_ref().map(|opt| match opt {
509                parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv) => *ndv,
510            }),
511            allow_single_file_parallelism: proto.allow_single_file_parallelism,
512            maximum_parallel_row_group_writers: proto.maximum_parallel_row_group_writers as usize,
513            maximum_buffered_record_batches_per_stream: proto.maximum_buffered_record_batches_per_stream as usize,
514            schema_force_view_types: proto.schema_force_view_types,
515            binary_as_string: proto.binary_as_string,
516            skip_arrow_metadata: proto.skip_arrow_metadata,
517            coerce_int96: proto.coerce_int96_opt.as_ref().map(|opt| match opt {
518                parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => coerce_int96.clone(),
519            }),
520        }
521    }
522}
523
524impl From<ParquetColumnOptionsProto> for ParquetColumnOptions {
525    fn from(proto: ParquetColumnOptionsProto) -> Self {
526        #[allow(deprecated)] // max_statistics_size
527        ParquetColumnOptions {
528            bloom_filter_enabled: proto.bloom_filter_enabled_opt.map(
529                |parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v)| v,
530            ),
531            encoding: proto
532                .encoding_opt
533                .map(|parquet_column_options::EncodingOpt::Encoding(v)| v),
534            dictionary_enabled: proto.dictionary_enabled_opt.map(
535                |parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(v)| v,
536            ),
537            compression: proto
538                .compression_opt
539                .map(|parquet_column_options::CompressionOpt::Compression(v)| v),
540            statistics_enabled: proto.statistics_enabled_opt.map(
541                |parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(v)| v,
542            ),
543            bloom_filter_fpp: proto
544                .bloom_filter_fpp_opt
545                .map(|parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v)| v),
546            bloom_filter_ndv: proto
547                .bloom_filter_ndv_opt
548                .map(|parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v)| v),
549            max_statistics_size: proto.max_statistics_size_opt.map(
550                |parquet_column_options::MaxStatisticsSizeOpt::MaxStatisticsSize(v)| {
551                    v as usize
552                },
553            ),
554        }
555    }
556}
557
558impl From<&TableParquetOptionsProto> for TableParquetOptions {
559    fn from(proto: &TableParquetOptionsProto) -> Self {
560        TableParquetOptions {
561            global: proto
562                .global
563                .as_ref()
564                .map(ParquetOptions::from)
565                .unwrap_or_default(),
566            column_specific_options: proto
567                .column_specific_options
568                .iter()
569                .map(|parquet_column_options| {
570                    (
571                        parquet_column_options.column_name.clone(),
572                        ParquetColumnOptions::from(
573                            parquet_column_options.options.clone().unwrap_or_default(),
574                        ),
575                    )
576                })
577                .collect(),
578            key_value_metadata: proto
579                .key_value_metadata
580                .iter()
581                .map(|(k, v)| (k.clone(), Some(v.clone())))
582                .collect(),
583        }
584    }
585}
586
587#[derive(Debug)]
588pub struct ParquetLogicalExtensionCodec;
589
590// TODO! This is a placeholder for now and needs to be implemented for real.
591impl LogicalExtensionCodec for ParquetLogicalExtensionCodec {
592    fn try_decode(
593        &self,
594        _buf: &[u8],
595        _inputs: &[datafusion_expr::LogicalPlan],
596        _ctx: &SessionContext,
597    ) -> datafusion_common::Result<datafusion_expr::Extension> {
598        not_impl_err!("Method not implemented")
599    }
600
601    fn try_encode(
602        &self,
603        _node: &datafusion_expr::Extension,
604        _buf: &mut Vec<u8>,
605    ) -> datafusion_common::Result<()> {
606        not_impl_err!("Method not implemented")
607    }
608
609    fn try_decode_table_provider(
610        &self,
611        _buf: &[u8],
612        _table_ref: &TableReference,
613        _schema: arrow::datatypes::SchemaRef,
614        _ctx: &SessionContext,
615    ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
616        not_impl_err!("Method not implemented")
617    }
618
619    fn try_encode_table_provider(
620        &self,
621        _table_ref: &TableReference,
622        _node: Arc<dyn datafusion::datasource::TableProvider>,
623        _buf: &mut Vec<u8>,
624    ) -> datafusion_common::Result<()> {
625        not_impl_err!("Method not implemented")
626    }
627
628    fn try_decode_file_format(
629        &self,
630        buf: &[u8],
631        _ctx: &SessionContext,
632    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
633        let proto = TableParquetOptionsProto::decode(buf).map_err(|e| {
634            DataFusionError::Execution(format!(
635                "Failed to decode TableParquetOptionsProto: {:?}",
636                e
637            ))
638        })?;
639        let options: TableParquetOptions = (&proto).into();
640        Ok(Arc::new(ParquetFormatFactory {
641            options: Some(options),
642        }))
643    }
644
645    fn try_encode_file_format(
646        &self,
647        buf: &mut Vec<u8>,
648        node: Arc<dyn FileFormatFactory>,
649    ) -> datafusion_common::Result<()> {
650        let options = if let Some(parquet_factory) =
651            node.as_any().downcast_ref::<ParquetFormatFactory>()
652        {
653            parquet_factory.options.clone().unwrap_or_default()
654        } else {
655            return Err(DataFusionError::Execution(
656                "Unsupported FileFormatFactory type".to_string(),
657            ));
658        };
659
660        let proto = TableParquetOptionsProto::from_factory(&ParquetFormatFactory {
661            options: Some(options),
662        });
663
664        proto.encode(buf).map_err(|e| {
665            DataFusionError::Execution(format!(
666                "Failed to encode TableParquetOptionsProto: {:?}",
667                e
668            ))
669        })?;
670
671        Ok(())
672    }
673}
674
675#[derive(Debug)]
676pub struct ArrowLogicalExtensionCodec;
677
678// TODO! This is a placeholder for now and needs to be implemented for real.
679impl LogicalExtensionCodec for ArrowLogicalExtensionCodec {
680    fn try_decode(
681        &self,
682        _buf: &[u8],
683        _inputs: &[datafusion_expr::LogicalPlan],
684        _ctx: &SessionContext,
685    ) -> datafusion_common::Result<datafusion_expr::Extension> {
686        not_impl_err!("Method not implemented")
687    }
688
689    fn try_encode(
690        &self,
691        _node: &datafusion_expr::Extension,
692        _buf: &mut Vec<u8>,
693    ) -> datafusion_common::Result<()> {
694        not_impl_err!("Method not implemented")
695    }
696
697    fn try_decode_table_provider(
698        &self,
699        _buf: &[u8],
700        _table_ref: &TableReference,
701        _schema: arrow::datatypes::SchemaRef,
702        _ctx: &SessionContext,
703    ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
704        not_impl_err!("Method not implemented")
705    }
706
707    fn try_encode_table_provider(
708        &self,
709        _table_ref: &TableReference,
710        _node: Arc<dyn datafusion::datasource::TableProvider>,
711        _buf: &mut Vec<u8>,
712    ) -> datafusion_common::Result<()> {
713        not_impl_err!("Method not implemented")
714    }
715
716    fn try_decode_file_format(
717        &self,
718        __buf: &[u8],
719        __ctx: &SessionContext,
720    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
721        Ok(Arc::new(ArrowFormatFactory::new()))
722    }
723
724    fn try_encode_file_format(
725        &self,
726        __buf: &mut Vec<u8>,
727        __node: Arc<dyn FileFormatFactory>,
728    ) -> datafusion_common::Result<()> {
729        Ok(())
730    }
731}
732
733#[derive(Debug)]
734pub struct AvroLogicalExtensionCodec;
735
736// TODO! This is a placeholder for now and needs to be implemented for real.
737impl LogicalExtensionCodec for AvroLogicalExtensionCodec {
738    fn try_decode(
739        &self,
740        _buf: &[u8],
741        _inputs: &[datafusion_expr::LogicalPlan],
742        _ctx: &SessionContext,
743    ) -> datafusion_common::Result<datafusion_expr::Extension> {
744        not_impl_err!("Method not implemented")
745    }
746
747    fn try_encode(
748        &self,
749        _node: &datafusion_expr::Extension,
750        _buf: &mut Vec<u8>,
751    ) -> datafusion_common::Result<()> {
752        not_impl_err!("Method not implemented")
753    }
754
755    fn try_decode_table_provider(
756        &self,
757        _buf: &[u8],
758        _table_ref: &TableReference,
759        _schema: arrow::datatypes::SchemaRef,
760        _cts: &SessionContext,
761    ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
762        not_impl_err!("Method not implemented")
763    }
764
765    fn try_encode_table_provider(
766        &self,
767        _table_ref: &TableReference,
768        _node: Arc<dyn datafusion::datasource::TableProvider>,
769        _buf: &mut Vec<u8>,
770    ) -> datafusion_common::Result<()> {
771        not_impl_err!("Method not implemented")
772    }
773
774    fn try_decode_file_format(
775        &self,
776        __buf: &[u8],
777        __ctx: &SessionContext,
778    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
779        Ok(Arc::new(ArrowFormatFactory::new()))
780    }
781
782    fn try_encode_file_format(
783        &self,
784        __buf: &mut Vec<u8>,
785        __node: Arc<dyn FileFormatFactory>,
786    ) -> datafusion_common::Result<()> {
787        Ok(())
788    }
789}