datafusion_proto/logical_plan/
file_formats.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use datafusion::{
21    config::{
22        CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
23        TableParquetOptions,
24    },
25    datasource::file_format::{
26        arrow::ArrowFormatFactory, csv::CsvFormatFactory, json::JsonFormatFactory,
27        parquet::ParquetFormatFactory, FileFormatFactory,
28    },
29    prelude::SessionContext,
30};
31use datafusion_common::{
32    exec_err, not_impl_err, parsers::CompressionTypeVariant, DataFusionError,
33    TableReference,
34};
35use prost::Message;
36
37use crate::protobuf::{
38    parquet_column_options, parquet_options, CsvOptions as CsvOptionsProto,
39    JsonOptions as JsonOptionsProto, ParquetColumnOptions as ParquetColumnOptionsProto,
40    ParquetColumnSpecificOptions, ParquetOptions as ParquetOptionsProto,
41    TableParquetOptions as TableParquetOptionsProto,
42};
43
44use super::LogicalExtensionCodec;
45
46#[derive(Debug)]
47pub struct CsvLogicalExtensionCodec;
48
49impl CsvOptionsProto {
50    fn from_factory(factory: &CsvFormatFactory) -> Self {
51        if let Some(options) = &factory.options {
52            CsvOptionsProto {
53                has_header: options.has_header.map_or(vec![], |v| vec![v as u8]),
54                delimiter: vec![options.delimiter],
55                quote: vec![options.quote],
56                terminator: options.terminator.map_or(vec![], |v| vec![v]),
57                escape: options.escape.map_or(vec![], |v| vec![v]),
58                double_quote: options.double_quote.map_or(vec![], |v| vec![v as u8]),
59                compression: options.compression as i32,
60                schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
61                date_format: options.date_format.clone().unwrap_or_default(),
62                datetime_format: options.datetime_format.clone().unwrap_or_default(),
63                timestamp_format: options.timestamp_format.clone().unwrap_or_default(),
64                timestamp_tz_format: options
65                    .timestamp_tz_format
66                    .clone()
67                    .unwrap_or_default(),
68                time_format: options.time_format.clone().unwrap_or_default(),
69                null_value: options.null_value.clone().unwrap_or_default(),
70                null_regex: options.null_regex.clone().unwrap_or_default(),
71                comment: options.comment.map_or(vec![], |v| vec![v]),
72                newlines_in_values: options
73                    .newlines_in_values
74                    .map_or(vec![], |v| vec![v as u8]),
75            }
76        } else {
77            CsvOptionsProto::default()
78        }
79    }
80}
81
82impl From<&CsvOptionsProto> for CsvOptions {
83    fn from(proto: &CsvOptionsProto) -> Self {
84        CsvOptions {
85            has_header: if !proto.has_header.is_empty() {
86                Some(proto.has_header[0] != 0)
87            } else {
88                None
89            },
90            delimiter: proto.delimiter.first().copied().unwrap_or(b','),
91            quote: proto.quote.first().copied().unwrap_or(b'"'),
92            terminator: if !proto.terminator.is_empty() {
93                Some(proto.terminator[0])
94            } else {
95                None
96            },
97            escape: if !proto.escape.is_empty() {
98                Some(proto.escape[0])
99            } else {
100                None
101            },
102            double_quote: if !proto.double_quote.is_empty() {
103                Some(proto.double_quote[0] != 0)
104            } else {
105                None
106            },
107            compression: match proto.compression {
108                0 => CompressionTypeVariant::GZIP,
109                1 => CompressionTypeVariant::BZIP2,
110                2 => CompressionTypeVariant::XZ,
111                3 => CompressionTypeVariant::ZSTD,
112                _ => CompressionTypeVariant::UNCOMPRESSED,
113            },
114            schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
115            date_format: if proto.date_format.is_empty() {
116                None
117            } else {
118                Some(proto.date_format.clone())
119            },
120            datetime_format: if proto.datetime_format.is_empty() {
121                None
122            } else {
123                Some(proto.datetime_format.clone())
124            },
125            timestamp_format: if proto.timestamp_format.is_empty() {
126                None
127            } else {
128                Some(proto.timestamp_format.clone())
129            },
130            timestamp_tz_format: if proto.timestamp_tz_format.is_empty() {
131                None
132            } else {
133                Some(proto.timestamp_tz_format.clone())
134            },
135            time_format: if proto.time_format.is_empty() {
136                None
137            } else {
138                Some(proto.time_format.clone())
139            },
140            null_value: if proto.null_value.is_empty() {
141                None
142            } else {
143                Some(proto.null_value.clone())
144            },
145            null_regex: if proto.null_regex.is_empty() {
146                None
147            } else {
148                Some(proto.null_regex.clone())
149            },
150            comment: if !proto.comment.is_empty() {
151                Some(proto.comment[0])
152            } else {
153                None
154            },
155            newlines_in_values: if proto.newlines_in_values.is_empty() {
156                None
157            } else {
158                Some(proto.newlines_in_values[0] != 0)
159            },
160        }
161    }
162}
163
164// TODO! This is a placeholder for now and needs to be implemented for real.
165impl LogicalExtensionCodec for CsvLogicalExtensionCodec {
166    fn try_decode(
167        &self,
168        _buf: &[u8],
169        _inputs: &[datafusion_expr::LogicalPlan],
170        _ctx: &SessionContext,
171    ) -> datafusion_common::Result<datafusion_expr::Extension> {
172        not_impl_err!("Method not implemented")
173    }
174
175    fn try_encode(
176        &self,
177        _node: &datafusion_expr::Extension,
178        _buf: &mut Vec<u8>,
179    ) -> datafusion_common::Result<()> {
180        not_impl_err!("Method not implemented")
181    }
182
183    fn try_decode_table_provider(
184        &self,
185        _buf: &[u8],
186        _table_ref: &TableReference,
187        _schema: arrow::datatypes::SchemaRef,
188        _ctx: &SessionContext,
189    ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
190        not_impl_err!("Method not implemented")
191    }
192
193    fn try_encode_table_provider(
194        &self,
195        _table_ref: &TableReference,
196        _node: Arc<dyn datafusion::datasource::TableProvider>,
197        _buf: &mut Vec<u8>,
198    ) -> datafusion_common::Result<()> {
199        not_impl_err!("Method not implemented")
200    }
201
202    fn try_decode_file_format(
203        &self,
204        buf: &[u8],
205        _ctx: &SessionContext,
206    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
207        let proto = CsvOptionsProto::decode(buf).map_err(|e| {
208            DataFusionError::Execution(format!(
209                "Failed to decode CsvOptionsProto: {:?}",
210                e
211            ))
212        })?;
213        let options: CsvOptions = (&proto).into();
214        Ok(Arc::new(CsvFormatFactory {
215            options: Some(options),
216        }))
217    }
218
219    fn try_encode_file_format(
220        &self,
221        buf: &mut Vec<u8>,
222        node: Arc<dyn FileFormatFactory>,
223    ) -> datafusion_common::Result<()> {
224        let options =
225            if let Some(csv_factory) = node.as_any().downcast_ref::<CsvFormatFactory>() {
226                csv_factory.options.clone().unwrap_or_default()
227            } else {
228                return exec_err!("{}", "Unsupported FileFormatFactory type".to_string());
229            };
230
231        let proto = CsvOptionsProto::from_factory(&CsvFormatFactory {
232            options: Some(options),
233        });
234
235        proto.encode(buf).map_err(|e| {
236            DataFusionError::Execution(format!("Failed to encode CsvOptions: {:?}", e))
237        })?;
238
239        Ok(())
240    }
241}
242
243impl JsonOptionsProto {
244    fn from_factory(factory: &JsonFormatFactory) -> Self {
245        if let Some(options) = &factory.options {
246            JsonOptionsProto {
247                compression: options.compression as i32,
248                schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
249            }
250        } else {
251            JsonOptionsProto::default()
252        }
253    }
254}
255
256impl From<&JsonOptionsProto> for JsonOptions {
257    fn from(proto: &JsonOptionsProto) -> Self {
258        JsonOptions {
259            compression: match proto.compression {
260                0 => CompressionTypeVariant::GZIP,
261                1 => CompressionTypeVariant::BZIP2,
262                2 => CompressionTypeVariant::XZ,
263                3 => CompressionTypeVariant::ZSTD,
264                _ => CompressionTypeVariant::UNCOMPRESSED,
265            },
266            schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
267        }
268    }
269}
270
271#[derive(Debug)]
272pub struct JsonLogicalExtensionCodec;
273
274// TODO! This is a placeholder for now and needs to be implemented for real.
275impl LogicalExtensionCodec for JsonLogicalExtensionCodec {
276    fn try_decode(
277        &self,
278        _buf: &[u8],
279        _inputs: &[datafusion_expr::LogicalPlan],
280        _ctx: &SessionContext,
281    ) -> datafusion_common::Result<datafusion_expr::Extension> {
282        not_impl_err!("Method not implemented")
283    }
284
285    fn try_encode(
286        &self,
287        _node: &datafusion_expr::Extension,
288        _buf: &mut Vec<u8>,
289    ) -> datafusion_common::Result<()> {
290        not_impl_err!("Method not implemented")
291    }
292
293    fn try_decode_table_provider(
294        &self,
295        _buf: &[u8],
296        _table_ref: &TableReference,
297        _schema: arrow::datatypes::SchemaRef,
298        _ctx: &SessionContext,
299    ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
300        not_impl_err!("Method not implemented")
301    }
302
303    fn try_encode_table_provider(
304        &self,
305        _table_ref: &TableReference,
306        _node: Arc<dyn datafusion::datasource::TableProvider>,
307        _buf: &mut Vec<u8>,
308    ) -> datafusion_common::Result<()> {
309        not_impl_err!("Method not implemented")
310    }
311
312    fn try_decode_file_format(
313        &self,
314        buf: &[u8],
315        _ctx: &SessionContext,
316    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
317        let proto = JsonOptionsProto::decode(buf).map_err(|e| {
318            DataFusionError::Execution(format!(
319                "Failed to decode JsonOptionsProto: {:?}",
320                e
321            ))
322        })?;
323        let options: JsonOptions = (&proto).into();
324        Ok(Arc::new(JsonFormatFactory {
325            options: Some(options),
326        }))
327    }
328
329    fn try_encode_file_format(
330        &self,
331        buf: &mut Vec<u8>,
332        node: Arc<dyn FileFormatFactory>,
333    ) -> datafusion_common::Result<()> {
334        let options = if let Some(json_factory) =
335            node.as_any().downcast_ref::<JsonFormatFactory>()
336        {
337            json_factory.options.clone().unwrap_or_default()
338        } else {
339            return Err(DataFusionError::Execution(
340                "Unsupported FileFormatFactory type".to_string(),
341            ));
342        };
343
344        let proto = JsonOptionsProto::from_factory(&JsonFormatFactory {
345            options: Some(options),
346        });
347
348        proto.encode(buf).map_err(|e| {
349            DataFusionError::Execution(format!("Failed to encode JsonOptions: {:?}", e))
350        })?;
351
352        Ok(())
353    }
354}
355
356impl TableParquetOptionsProto {
357    fn from_factory(factory: &ParquetFormatFactory) -> Self {
358        let global_options = if let Some(ref options) = factory.options {
359            options.clone()
360        } else {
361            return TableParquetOptionsProto::default();
362        };
363
364        let column_specific_options = global_options.column_specific_options;
365        #[allow(deprecated)] // max_statistics_size
366        TableParquetOptionsProto {
367            global: Some(ParquetOptionsProto {
368                enable_page_index: global_options.global.enable_page_index,
369                pruning: global_options.global.pruning,
370                skip_metadata: global_options.global.skip_metadata,
371                metadata_size_hint_opt: global_options.global.metadata_size_hint.map(|size| {
372                    parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size as u64)
373                }),
374                pushdown_filters: global_options.global.pushdown_filters,
375                reorder_filters: global_options.global.reorder_filters,
376                data_pagesize_limit: global_options.global.data_pagesize_limit as u64,
377                write_batch_size: global_options.global.write_batch_size as u64,
378                writer_version: global_options.global.writer_version.clone(),
379                compression_opt: global_options.global.compression.map(|compression| {
380                    parquet_options::CompressionOpt::Compression(compression)
381                }),
382                dictionary_enabled_opt: global_options.global.dictionary_enabled.map(|enabled| {
383                    parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
384                }),
385                dictionary_page_size_limit: global_options.global.dictionary_page_size_limit as u64,
386                statistics_enabled_opt: global_options.global.statistics_enabled.map(|enabled| {
387                    parquet_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
388                }),
389                max_statistics_size_opt: global_options.global.max_statistics_size.map(|size| {
390                    parquet_options::MaxStatisticsSizeOpt::MaxStatisticsSize(size as u64)
391                }),
392                max_row_group_size: global_options.global.max_row_group_size as u64,
393                created_by: global_options.global.created_by.clone(),
394                column_index_truncate_length_opt: global_options.global.column_index_truncate_length.map(|length| {
395                    parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length as u64)
396                }),
397                statistics_truncate_length_opt: global_options.global.statistics_truncate_length.map(|length| {
398                    parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length as u64)
399                }),
400                data_page_row_count_limit: global_options.global.data_page_row_count_limit as u64,
401                encoding_opt: global_options.global.encoding.map(|encoding| {
402                    parquet_options::EncodingOpt::Encoding(encoding)
403                }),
404                bloom_filter_on_read: global_options.global.bloom_filter_on_read,
405                bloom_filter_on_write: global_options.global.bloom_filter_on_write,
406                bloom_filter_fpp_opt: global_options.global.bloom_filter_fpp.map(|fpp| {
407                    parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
408                }),
409                bloom_filter_ndv_opt: global_options.global.bloom_filter_ndv.map(|ndv| {
410                    parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
411                }),
412                allow_single_file_parallelism: global_options.global.allow_single_file_parallelism,
413                maximum_parallel_row_group_writers: global_options.global.maximum_parallel_row_group_writers as u64,
414                maximum_buffered_record_batches_per_stream: global_options.global.maximum_buffered_record_batches_per_stream as u64,
415                schema_force_view_types: global_options.global.schema_force_view_types,
416                binary_as_string: global_options.global.binary_as_string,
417                skip_arrow_metadata: global_options.global.skip_arrow_metadata,
418            }),
419            column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| {
420                ParquetColumnSpecificOptions {
421                    column_name,
422                    options: Some(ParquetColumnOptionsProto {
423                        bloom_filter_enabled_opt: options.bloom_filter_enabled.map(|enabled| {
424                            parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(enabled)
425                        }),
426                        encoding_opt: options.encoding.map(|encoding| {
427                            parquet_column_options::EncodingOpt::Encoding(encoding)
428                        }),
429                        dictionary_enabled_opt: options.dictionary_enabled.map(|enabled| {
430                            parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
431                        }),
432                        compression_opt: options.compression.map(|compression| {
433                            parquet_column_options::CompressionOpt::Compression(compression)
434                        }),
435                        statistics_enabled_opt: options.statistics_enabled.map(|enabled| {
436                            parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
437                        }),
438                        bloom_filter_fpp_opt: options.bloom_filter_fpp.map(|fpp| {
439                            parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
440                        }),
441                        bloom_filter_ndv_opt: options.bloom_filter_ndv.map(|ndv| {
442                            parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
443                        }),
444                        max_statistics_size_opt: options.max_statistics_size.map(|size| {
445                            parquet_column_options::MaxStatisticsSizeOpt::MaxStatisticsSize(size as u32)
446                        }),
447                    })
448                }
449            }).collect(),
450            key_value_metadata: global_options.key_value_metadata
451                .iter()
452                .filter_map(|(key, value)| {
453                    value.as_ref().map(|v| (key.clone(), v.clone()))
454                })
455                .collect(),
456        }
457    }
458}
459
460impl From<&ParquetOptionsProto> for ParquetOptions {
461    fn from(proto: &ParquetOptionsProto) -> Self {
462        #[allow(deprecated)] // max_statistics_size
463        ParquetOptions {
464            enable_page_index: proto.enable_page_index,
465            pruning: proto.pruning,
466            skip_metadata: proto.skip_metadata,
467            metadata_size_hint: proto.metadata_size_hint_opt.as_ref().map(|opt| match opt {
468                parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size) => *size as usize,
469            }),
470            pushdown_filters: proto.pushdown_filters,
471            reorder_filters: proto.reorder_filters,
472            data_pagesize_limit: proto.data_pagesize_limit as usize,
473            write_batch_size: proto.write_batch_size as usize,
474            writer_version: proto.writer_version.clone(),
475            compression: proto.compression_opt.as_ref().map(|opt| match opt {
476                parquet_options::CompressionOpt::Compression(compression) => compression.clone(),
477            }),
478            dictionary_enabled: proto.dictionary_enabled_opt.as_ref().map(|opt| match opt {
479                parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled) => *enabled,
480            }),
481            dictionary_page_size_limit: proto.dictionary_page_size_limit as usize,
482            statistics_enabled: proto.statistics_enabled_opt.as_ref().map(|opt| match opt {
483                parquet_options::StatisticsEnabledOpt::StatisticsEnabled(statistics) => statistics.clone(),
484            }),
485            max_statistics_size: proto.max_statistics_size_opt.as_ref().map(|opt| match opt {
486                parquet_options::MaxStatisticsSizeOpt::MaxStatisticsSize(size) => *size as usize,
487            }),
488            max_row_group_size: proto.max_row_group_size as usize,
489            created_by: proto.created_by.clone(),
490            column_index_truncate_length: proto.column_index_truncate_length_opt.as_ref().map(|opt| match opt {
491                parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length) => *length as usize,
492            }),
493            statistics_truncate_length: proto.statistics_truncate_length_opt.as_ref().map(|opt| match opt {
494                parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length) => *length as usize,
495            }),
496            data_page_row_count_limit: proto.data_page_row_count_limit as usize,
497            encoding: proto.encoding_opt.as_ref().map(|opt| match opt {
498                parquet_options::EncodingOpt::Encoding(encoding) => encoding.clone(),
499            }),
500            bloom_filter_on_read: proto.bloom_filter_on_read,
501            bloom_filter_on_write: proto.bloom_filter_on_write,
502            bloom_filter_fpp: proto.bloom_filter_fpp_opt.as_ref().map(|opt| match opt {
503                parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp) => *fpp,
504            }),
505            bloom_filter_ndv: proto.bloom_filter_ndv_opt.as_ref().map(|opt| match opt {
506                parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv) => *ndv,
507            }),
508            allow_single_file_parallelism: proto.allow_single_file_parallelism,
509            maximum_parallel_row_group_writers: proto.maximum_parallel_row_group_writers as usize,
510            maximum_buffered_record_batches_per_stream: proto.maximum_buffered_record_batches_per_stream as usize,
511            schema_force_view_types: proto.schema_force_view_types,
512            binary_as_string: proto.binary_as_string,
513            skip_arrow_metadata: proto.skip_arrow_metadata,
514        }
515    }
516}
517
518impl From<ParquetColumnOptionsProto> for ParquetColumnOptions {
519    fn from(proto: ParquetColumnOptionsProto) -> Self {
520        #[allow(deprecated)] // max_statistics_size
521        ParquetColumnOptions {
522            bloom_filter_enabled: proto.bloom_filter_enabled_opt.map(
523                |parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v)| v,
524            ),
525            encoding: proto
526                .encoding_opt
527                .map(|parquet_column_options::EncodingOpt::Encoding(v)| v),
528            dictionary_enabled: proto.dictionary_enabled_opt.map(
529                |parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(v)| v,
530            ),
531            compression: proto
532                .compression_opt
533                .map(|parquet_column_options::CompressionOpt::Compression(v)| v),
534            statistics_enabled: proto.statistics_enabled_opt.map(
535                |parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(v)| v,
536            ),
537            bloom_filter_fpp: proto
538                .bloom_filter_fpp_opt
539                .map(|parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v)| v),
540            bloom_filter_ndv: proto
541                .bloom_filter_ndv_opt
542                .map(|parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v)| v),
543            max_statistics_size: proto.max_statistics_size_opt.map(
544                |parquet_column_options::MaxStatisticsSizeOpt::MaxStatisticsSize(v)| {
545                    v as usize
546                },
547            ),
548        }
549    }
550}
551
552impl From<&TableParquetOptionsProto> for TableParquetOptions {
553    fn from(proto: &TableParquetOptionsProto) -> Self {
554        TableParquetOptions {
555            global: proto
556                .global
557                .as_ref()
558                .map(ParquetOptions::from)
559                .unwrap_or_default(),
560            column_specific_options: proto
561                .column_specific_options
562                .iter()
563                .map(|parquet_column_options| {
564                    (
565                        parquet_column_options.column_name.clone(),
566                        ParquetColumnOptions::from(
567                            parquet_column_options.options.clone().unwrap_or_default(),
568                        ),
569                    )
570                })
571                .collect(),
572            key_value_metadata: proto
573                .key_value_metadata
574                .iter()
575                .map(|(k, v)| (k.clone(), Some(v.clone())))
576                .collect(),
577        }
578    }
579}
580
581#[derive(Debug)]
582pub struct ParquetLogicalExtensionCodec;
583
584// TODO! This is a placeholder for now and needs to be implemented for real.
585impl LogicalExtensionCodec for ParquetLogicalExtensionCodec {
586    fn try_decode(
587        &self,
588        _buf: &[u8],
589        _inputs: &[datafusion_expr::LogicalPlan],
590        _ctx: &SessionContext,
591    ) -> datafusion_common::Result<datafusion_expr::Extension> {
592        not_impl_err!("Method not implemented")
593    }
594
595    fn try_encode(
596        &self,
597        _node: &datafusion_expr::Extension,
598        _buf: &mut Vec<u8>,
599    ) -> datafusion_common::Result<()> {
600        not_impl_err!("Method not implemented")
601    }
602
603    fn try_decode_table_provider(
604        &self,
605        _buf: &[u8],
606        _table_ref: &TableReference,
607        _schema: arrow::datatypes::SchemaRef,
608        _ctx: &SessionContext,
609    ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
610        not_impl_err!("Method not implemented")
611    }
612
613    fn try_encode_table_provider(
614        &self,
615        _table_ref: &TableReference,
616        _node: Arc<dyn datafusion::datasource::TableProvider>,
617        _buf: &mut Vec<u8>,
618    ) -> datafusion_common::Result<()> {
619        not_impl_err!("Method not implemented")
620    }
621
622    fn try_decode_file_format(
623        &self,
624        buf: &[u8],
625        _ctx: &SessionContext,
626    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
627        let proto = TableParquetOptionsProto::decode(buf).map_err(|e| {
628            DataFusionError::Execution(format!(
629                "Failed to decode TableParquetOptionsProto: {:?}",
630                e
631            ))
632        })?;
633        let options: TableParquetOptions = (&proto).into();
634        Ok(Arc::new(ParquetFormatFactory {
635            options: Some(options),
636        }))
637    }
638
639    fn try_encode_file_format(
640        &self,
641        buf: &mut Vec<u8>,
642        node: Arc<dyn FileFormatFactory>,
643    ) -> datafusion_common::Result<()> {
644        let options = if let Some(parquet_factory) =
645            node.as_any().downcast_ref::<ParquetFormatFactory>()
646        {
647            parquet_factory.options.clone().unwrap_or_default()
648        } else {
649            return Err(DataFusionError::Execution(
650                "Unsupported FileFormatFactory type".to_string(),
651            ));
652        };
653
654        let proto = TableParquetOptionsProto::from_factory(&ParquetFormatFactory {
655            options: Some(options),
656        });
657
658        proto.encode(buf).map_err(|e| {
659            DataFusionError::Execution(format!(
660                "Failed to encode TableParquetOptionsProto: {:?}",
661                e
662            ))
663        })?;
664
665        Ok(())
666    }
667}
668
669#[derive(Debug)]
670pub struct ArrowLogicalExtensionCodec;
671
672// TODO! This is a placeholder for now and needs to be implemented for real.
673impl LogicalExtensionCodec for ArrowLogicalExtensionCodec {
674    fn try_decode(
675        &self,
676        _buf: &[u8],
677        _inputs: &[datafusion_expr::LogicalPlan],
678        _ctx: &SessionContext,
679    ) -> datafusion_common::Result<datafusion_expr::Extension> {
680        not_impl_err!("Method not implemented")
681    }
682
683    fn try_encode(
684        &self,
685        _node: &datafusion_expr::Extension,
686        _buf: &mut Vec<u8>,
687    ) -> datafusion_common::Result<()> {
688        not_impl_err!("Method not implemented")
689    }
690
691    fn try_decode_table_provider(
692        &self,
693        _buf: &[u8],
694        _table_ref: &TableReference,
695        _schema: arrow::datatypes::SchemaRef,
696        _ctx: &SessionContext,
697    ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
698        not_impl_err!("Method not implemented")
699    }
700
701    fn try_encode_table_provider(
702        &self,
703        _table_ref: &TableReference,
704        _node: Arc<dyn datafusion::datasource::TableProvider>,
705        _buf: &mut Vec<u8>,
706    ) -> datafusion_common::Result<()> {
707        not_impl_err!("Method not implemented")
708    }
709
710    fn try_decode_file_format(
711        &self,
712        __buf: &[u8],
713        __ctx: &SessionContext,
714    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
715        Ok(Arc::new(ArrowFormatFactory::new()))
716    }
717
718    fn try_encode_file_format(
719        &self,
720        __buf: &mut Vec<u8>,
721        __node: Arc<dyn FileFormatFactory>,
722    ) -> datafusion_common::Result<()> {
723        Ok(())
724    }
725}
726
727#[derive(Debug)]
728pub struct AvroLogicalExtensionCodec;
729
730// TODO! This is a placeholder for now and needs to be implemented for real.
731impl LogicalExtensionCodec for AvroLogicalExtensionCodec {
732    fn try_decode(
733        &self,
734        _buf: &[u8],
735        _inputs: &[datafusion_expr::LogicalPlan],
736        _ctx: &SessionContext,
737    ) -> datafusion_common::Result<datafusion_expr::Extension> {
738        not_impl_err!("Method not implemented")
739    }
740
741    fn try_encode(
742        &self,
743        _node: &datafusion_expr::Extension,
744        _buf: &mut Vec<u8>,
745    ) -> datafusion_common::Result<()> {
746        not_impl_err!("Method not implemented")
747    }
748
749    fn try_decode_table_provider(
750        &self,
751        _buf: &[u8],
752        _table_ref: &TableReference,
753        _schema: arrow::datatypes::SchemaRef,
754        _cts: &SessionContext,
755    ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
756        not_impl_err!("Method not implemented")
757    }
758
759    fn try_encode_table_provider(
760        &self,
761        _table_ref: &TableReference,
762        _node: Arc<dyn datafusion::datasource::TableProvider>,
763        _buf: &mut Vec<u8>,
764    ) -> datafusion_common::Result<()> {
765        not_impl_err!("Method not implemented")
766    }
767
768    fn try_decode_file_format(
769        &self,
770        __buf: &[u8],
771        __ctx: &SessionContext,
772    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
773        Ok(Arc::new(ArrowFormatFactory::new()))
774    }
775
776    fn try_encode_file_format(
777        &self,
778        __buf: &mut Vec<u8>,
779        __node: Arc<dyn FileFormatFactory>,
780    ) -> datafusion_common::Result<()> {
781        Ok(())
782    }
783}