datafusion_proto/logical_plan/
file_formats.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use datafusion::{
21    config::{
22        CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
23        TableParquetOptions,
24    },
25    datasource::file_format::{
26        arrow::ArrowFormatFactory, csv::CsvFormatFactory, json::JsonFormatFactory,
27        parquet::ParquetFormatFactory, FileFormatFactory,
28    },
29    prelude::SessionContext,
30};
31use datafusion_common::{
32    exec_err, not_impl_err, parsers::CompressionTypeVariant, DataFusionError,
33    TableReference,
34};
35use prost::Message;
36
37use crate::protobuf::{
38    parquet_column_options, parquet_options, CsvOptions as CsvOptionsProto,
39    JsonOptions as JsonOptionsProto, ParquetColumnOptions as ParquetColumnOptionsProto,
40    ParquetColumnSpecificOptions, ParquetOptions as ParquetOptionsProto,
41    TableParquetOptions as TableParquetOptionsProto,
42};
43
44use super::LogicalExtensionCodec;
45
46#[derive(Debug)]
47pub struct CsvLogicalExtensionCodec;
48
49impl CsvOptionsProto {
50    fn from_factory(factory: &CsvFormatFactory) -> Self {
51        if let Some(options) = &factory.options {
52            CsvOptionsProto {
53                has_header: options.has_header.map_or(vec![], |v| vec![v as u8]),
54                delimiter: vec![options.delimiter],
55                quote: vec![options.quote],
56                terminator: options.terminator.map_or(vec![], |v| vec![v]),
57                escape: options.escape.map_or(vec![], |v| vec![v]),
58                double_quote: options.double_quote.map_or(vec![], |v| vec![v as u8]),
59                compression: options.compression as i32,
60                schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
61                date_format: options.date_format.clone().unwrap_or_default(),
62                datetime_format: options.datetime_format.clone().unwrap_or_default(),
63                timestamp_format: options.timestamp_format.clone().unwrap_or_default(),
64                timestamp_tz_format: options
65                    .timestamp_tz_format
66                    .clone()
67                    .unwrap_or_default(),
68                time_format: options.time_format.clone().unwrap_or_default(),
69                null_value: options.null_value.clone().unwrap_or_default(),
70                null_regex: options.null_regex.clone().unwrap_or_default(),
71                comment: options.comment.map_or(vec![], |v| vec![v]),
72                newlines_in_values: options
73                    .newlines_in_values
74                    .map_or(vec![], |v| vec![v as u8]),
75            }
76        } else {
77            CsvOptionsProto::default()
78        }
79    }
80}
81
82impl From<&CsvOptionsProto> for CsvOptions {
83    fn from(proto: &CsvOptionsProto) -> Self {
84        CsvOptions {
85            has_header: if !proto.has_header.is_empty() {
86                Some(proto.has_header[0] != 0)
87            } else {
88                None
89            },
90            delimiter: proto.delimiter.first().copied().unwrap_or(b','),
91            quote: proto.quote.first().copied().unwrap_or(b'"'),
92            terminator: if !proto.terminator.is_empty() {
93                Some(proto.terminator[0])
94            } else {
95                None
96            },
97            escape: if !proto.escape.is_empty() {
98                Some(proto.escape[0])
99            } else {
100                None
101            },
102            double_quote: if !proto.double_quote.is_empty() {
103                Some(proto.double_quote[0] != 0)
104            } else {
105                None
106            },
107            compression: match proto.compression {
108                0 => CompressionTypeVariant::GZIP,
109                1 => CompressionTypeVariant::BZIP2,
110                2 => CompressionTypeVariant::XZ,
111                3 => CompressionTypeVariant::ZSTD,
112                _ => CompressionTypeVariant::UNCOMPRESSED,
113            },
114            schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
115            date_format: if proto.date_format.is_empty() {
116                None
117            } else {
118                Some(proto.date_format.clone())
119            },
120            datetime_format: if proto.datetime_format.is_empty() {
121                None
122            } else {
123                Some(proto.datetime_format.clone())
124            },
125            timestamp_format: if proto.timestamp_format.is_empty() {
126                None
127            } else {
128                Some(proto.timestamp_format.clone())
129            },
130            timestamp_tz_format: if proto.timestamp_tz_format.is_empty() {
131                None
132            } else {
133                Some(proto.timestamp_tz_format.clone())
134            },
135            time_format: if proto.time_format.is_empty() {
136                None
137            } else {
138                Some(proto.time_format.clone())
139            },
140            null_value: if proto.null_value.is_empty() {
141                None
142            } else {
143                Some(proto.null_value.clone())
144            },
145            null_regex: if proto.null_regex.is_empty() {
146                None
147            } else {
148                Some(proto.null_regex.clone())
149            },
150            comment: if !proto.comment.is_empty() {
151                Some(proto.comment[0])
152            } else {
153                None
154            },
155            newlines_in_values: if proto.newlines_in_values.is_empty() {
156                None
157            } else {
158                Some(proto.newlines_in_values[0] != 0)
159            },
160        }
161    }
162}
163
164// TODO! This is a placeholder for now and needs to be implemented for real.
165impl LogicalExtensionCodec for CsvLogicalExtensionCodec {
166    fn try_decode(
167        &self,
168        _buf: &[u8],
169        _inputs: &[datafusion_expr::LogicalPlan],
170        _ctx: &SessionContext,
171    ) -> datafusion_common::Result<datafusion_expr::Extension> {
172        not_impl_err!("Method not implemented")
173    }
174
175    fn try_encode(
176        &self,
177        _node: &datafusion_expr::Extension,
178        _buf: &mut Vec<u8>,
179    ) -> datafusion_common::Result<()> {
180        not_impl_err!("Method not implemented")
181    }
182
183    fn try_decode_table_provider(
184        &self,
185        _buf: &[u8],
186        _table_ref: &TableReference,
187        _schema: arrow::datatypes::SchemaRef,
188        _ctx: &SessionContext,
189    ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
190        not_impl_err!("Method not implemented")
191    }
192
193    fn try_encode_table_provider(
194        &self,
195        _table_ref: &TableReference,
196        _node: Arc<dyn datafusion::datasource::TableProvider>,
197        _buf: &mut Vec<u8>,
198    ) -> datafusion_common::Result<()> {
199        not_impl_err!("Method not implemented")
200    }
201
202    fn try_decode_file_format(
203        &self,
204        buf: &[u8],
205        _ctx: &SessionContext,
206    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
207        let proto = CsvOptionsProto::decode(buf).map_err(|e| {
208            DataFusionError::Execution(format!("Failed to decode CsvOptionsProto: {e:?}"))
209        })?;
210        let options: CsvOptions = (&proto).into();
211        Ok(Arc::new(CsvFormatFactory {
212            options: Some(options),
213        }))
214    }
215
216    fn try_encode_file_format(
217        &self,
218        buf: &mut Vec<u8>,
219        node: Arc<dyn FileFormatFactory>,
220    ) -> datafusion_common::Result<()> {
221        let options =
222            if let Some(csv_factory) = node.as_any().downcast_ref::<CsvFormatFactory>() {
223                csv_factory.options.clone().unwrap_or_default()
224            } else {
225                return exec_err!("{}", "Unsupported FileFormatFactory type".to_string());
226            };
227
228        let proto = CsvOptionsProto::from_factory(&CsvFormatFactory {
229            options: Some(options),
230        });
231
232        proto.encode(buf).map_err(|e| {
233            DataFusionError::Execution(format!("Failed to encode CsvOptions: {e:?}"))
234        })?;
235
236        Ok(())
237    }
238}
239
240impl JsonOptionsProto {
241    fn from_factory(factory: &JsonFormatFactory) -> Self {
242        if let Some(options) = &factory.options {
243            JsonOptionsProto {
244                compression: options.compression as i32,
245                schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
246            }
247        } else {
248            JsonOptionsProto::default()
249        }
250    }
251}
252
253impl From<&JsonOptionsProto> for JsonOptions {
254    fn from(proto: &JsonOptionsProto) -> Self {
255        JsonOptions {
256            compression: match proto.compression {
257                0 => CompressionTypeVariant::GZIP,
258                1 => CompressionTypeVariant::BZIP2,
259                2 => CompressionTypeVariant::XZ,
260                3 => CompressionTypeVariant::ZSTD,
261                _ => CompressionTypeVariant::UNCOMPRESSED,
262            },
263            schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
264        }
265    }
266}
267
268#[derive(Debug)]
269pub struct JsonLogicalExtensionCodec;
270
271// TODO! This is a placeholder for now and needs to be implemented for real.
272impl LogicalExtensionCodec for JsonLogicalExtensionCodec {
273    fn try_decode(
274        &self,
275        _buf: &[u8],
276        _inputs: &[datafusion_expr::LogicalPlan],
277        _ctx: &SessionContext,
278    ) -> datafusion_common::Result<datafusion_expr::Extension> {
279        not_impl_err!("Method not implemented")
280    }
281
282    fn try_encode(
283        &self,
284        _node: &datafusion_expr::Extension,
285        _buf: &mut Vec<u8>,
286    ) -> datafusion_common::Result<()> {
287        not_impl_err!("Method not implemented")
288    }
289
290    fn try_decode_table_provider(
291        &self,
292        _buf: &[u8],
293        _table_ref: &TableReference,
294        _schema: arrow::datatypes::SchemaRef,
295        _ctx: &SessionContext,
296    ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
297        not_impl_err!("Method not implemented")
298    }
299
300    fn try_encode_table_provider(
301        &self,
302        _table_ref: &TableReference,
303        _node: Arc<dyn datafusion::datasource::TableProvider>,
304        _buf: &mut Vec<u8>,
305    ) -> datafusion_common::Result<()> {
306        not_impl_err!("Method not implemented")
307    }
308
309    fn try_decode_file_format(
310        &self,
311        buf: &[u8],
312        _ctx: &SessionContext,
313    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
314        let proto = JsonOptionsProto::decode(buf).map_err(|e| {
315            DataFusionError::Execution(format!(
316                "Failed to decode JsonOptionsProto: {e:?}"
317            ))
318        })?;
319        let options: JsonOptions = (&proto).into();
320        Ok(Arc::new(JsonFormatFactory {
321            options: Some(options),
322        }))
323    }
324
325    fn try_encode_file_format(
326        &self,
327        buf: &mut Vec<u8>,
328        node: Arc<dyn FileFormatFactory>,
329    ) -> datafusion_common::Result<()> {
330        let options = if let Some(json_factory) =
331            node.as_any().downcast_ref::<JsonFormatFactory>()
332        {
333            json_factory.options.clone().unwrap_or_default()
334        } else {
335            return Err(DataFusionError::Execution(
336                "Unsupported FileFormatFactory type".to_string(),
337            ));
338        };
339
340        let proto = JsonOptionsProto::from_factory(&JsonFormatFactory {
341            options: Some(options),
342        });
343
344        proto.encode(buf).map_err(|e| {
345            DataFusionError::Execution(format!("Failed to encode JsonOptions: {e:?}"))
346        })?;
347
348        Ok(())
349    }
350}
351
352impl TableParquetOptionsProto {
353    fn from_factory(factory: &ParquetFormatFactory) -> Self {
354        let global_options = if let Some(ref options) = factory.options {
355            options.clone()
356        } else {
357            return TableParquetOptionsProto::default();
358        };
359
360        let column_specific_options = global_options.column_specific_options;
361        #[allow(deprecated)] // max_statistics_size
362        TableParquetOptionsProto {
363            global: Some(ParquetOptionsProto {
364                enable_page_index: global_options.global.enable_page_index,
365                pruning: global_options.global.pruning,
366                skip_metadata: global_options.global.skip_metadata,
367                metadata_size_hint_opt: global_options.global.metadata_size_hint.map(|size| {
368                    parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size as u64)
369                }),
370                pushdown_filters: global_options.global.pushdown_filters,
371                reorder_filters: global_options.global.reorder_filters,
372                data_pagesize_limit: global_options.global.data_pagesize_limit as u64,
373                write_batch_size: global_options.global.write_batch_size as u64,
374                writer_version: global_options.global.writer_version.clone(),
375                compression_opt: global_options.global.compression.map(|compression| {
376                    parquet_options::CompressionOpt::Compression(compression)
377                }),
378                dictionary_enabled_opt: global_options.global.dictionary_enabled.map(|enabled| {
379                    parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
380                }),
381                dictionary_page_size_limit: global_options.global.dictionary_page_size_limit as u64,
382                statistics_enabled_opt: global_options.global.statistics_enabled.map(|enabled| {
383                    parquet_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
384                }),
385                max_row_group_size: global_options.global.max_row_group_size as u64,
386                created_by: global_options.global.created_by.clone(),
387                column_index_truncate_length_opt: global_options.global.column_index_truncate_length.map(|length| {
388                    parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length as u64)
389                }),
390                statistics_truncate_length_opt: global_options.global.statistics_truncate_length.map(|length| {
391                    parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length as u64)
392                }),
393                data_page_row_count_limit: global_options.global.data_page_row_count_limit as u64,
394                encoding_opt: global_options.global.encoding.map(|encoding| {
395                    parquet_options::EncodingOpt::Encoding(encoding)
396                }),
397                bloom_filter_on_read: global_options.global.bloom_filter_on_read,
398                bloom_filter_on_write: global_options.global.bloom_filter_on_write,
399                bloom_filter_fpp_opt: global_options.global.bloom_filter_fpp.map(|fpp| {
400                    parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
401                }),
402                bloom_filter_ndv_opt: global_options.global.bloom_filter_ndv.map(|ndv| {
403                    parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
404                }),
405                allow_single_file_parallelism: global_options.global.allow_single_file_parallelism,
406                maximum_parallel_row_group_writers: global_options.global.maximum_parallel_row_group_writers as u64,
407                maximum_buffered_record_batches_per_stream: global_options.global.maximum_buffered_record_batches_per_stream as u64,
408                schema_force_view_types: global_options.global.schema_force_view_types,
409                binary_as_string: global_options.global.binary_as_string,
410                skip_arrow_metadata: global_options.global.skip_arrow_metadata,
411                coerce_int96_opt: global_options.global.coerce_int96.map(|compression| {
412                    parquet_options::CoerceInt96Opt::CoerceInt96(compression)
413                }),
414            }),
415            column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| {
416                ParquetColumnSpecificOptions {
417                    column_name,
418                    options: Some(ParquetColumnOptionsProto {
419                        bloom_filter_enabled_opt: options.bloom_filter_enabled.map(|enabled| {
420                            parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(enabled)
421                        }),
422                        encoding_opt: options.encoding.map(|encoding| {
423                            parquet_column_options::EncodingOpt::Encoding(encoding)
424                        }),
425                        dictionary_enabled_opt: options.dictionary_enabled.map(|enabled| {
426                            parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
427                        }),
428                        compression_opt: options.compression.map(|compression| {
429                            parquet_column_options::CompressionOpt::Compression(compression)
430                        }),
431                        statistics_enabled_opt: options.statistics_enabled.map(|enabled| {
432                            parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
433                        }),
434                        bloom_filter_fpp_opt: options.bloom_filter_fpp.map(|fpp| {
435                            parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
436                        }),
437                        bloom_filter_ndv_opt: options.bloom_filter_ndv.map(|ndv| {
438                            parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
439                        }),
440                    })
441                }
442            }).collect(),
443            key_value_metadata: global_options.key_value_metadata
444                .iter()
445                .filter_map(|(key, value)| {
446                    value.as_ref().map(|v| (key.clone(), v.clone()))
447                })
448                .collect(),
449        }
450    }
451}
452
453impl From<&ParquetOptionsProto> for ParquetOptions {
454    fn from(proto: &ParquetOptionsProto) -> Self {
455        #[allow(deprecated)] // max_statistics_size
456        ParquetOptions {
457            enable_page_index: proto.enable_page_index,
458            pruning: proto.pruning,
459            skip_metadata: proto.skip_metadata,
460            metadata_size_hint: proto.metadata_size_hint_opt.as_ref().map(|opt| match opt {
461                parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size) => *size as usize,
462            }),
463            pushdown_filters: proto.pushdown_filters,
464            reorder_filters: proto.reorder_filters,
465            data_pagesize_limit: proto.data_pagesize_limit as usize,
466            write_batch_size: proto.write_batch_size as usize,
467            writer_version: proto.writer_version.clone(),
468            compression: proto.compression_opt.as_ref().map(|opt| match opt {
469                parquet_options::CompressionOpt::Compression(compression) => compression.clone(),
470            }),
471            dictionary_enabled: proto.dictionary_enabled_opt.as_ref().map(|opt| match opt {
472                parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled) => *enabled,
473            }),
474            dictionary_page_size_limit: proto.dictionary_page_size_limit as usize,
475            statistics_enabled: proto.statistics_enabled_opt.as_ref().map(|opt| match opt {
476                parquet_options::StatisticsEnabledOpt::StatisticsEnabled(statistics) => statistics.clone(),
477            }),
478            max_row_group_size: proto.max_row_group_size as usize,
479            created_by: proto.created_by.clone(),
480            column_index_truncate_length: proto.column_index_truncate_length_opt.as_ref().map(|opt| match opt {
481                parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length) => *length as usize,
482            }),
483            statistics_truncate_length: proto.statistics_truncate_length_opt.as_ref().map(|opt| match opt {
484                parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length) => *length as usize,
485            }),
486            data_page_row_count_limit: proto.data_page_row_count_limit as usize,
487            encoding: proto.encoding_opt.as_ref().map(|opt| match opt {
488                parquet_options::EncodingOpt::Encoding(encoding) => encoding.clone(),
489            }),
490            bloom_filter_on_read: proto.bloom_filter_on_read,
491            bloom_filter_on_write: proto.bloom_filter_on_write,
492            bloom_filter_fpp: proto.bloom_filter_fpp_opt.as_ref().map(|opt| match opt {
493                parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp) => *fpp,
494            }),
495            bloom_filter_ndv: proto.bloom_filter_ndv_opt.as_ref().map(|opt| match opt {
496                parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv) => *ndv,
497            }),
498            allow_single_file_parallelism: proto.allow_single_file_parallelism,
499            maximum_parallel_row_group_writers: proto.maximum_parallel_row_group_writers as usize,
500            maximum_buffered_record_batches_per_stream: proto.maximum_buffered_record_batches_per_stream as usize,
501            schema_force_view_types: proto.schema_force_view_types,
502            binary_as_string: proto.binary_as_string,
503            skip_arrow_metadata: proto.skip_arrow_metadata,
504            coerce_int96: proto.coerce_int96_opt.as_ref().map(|opt| match opt {
505                parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => coerce_int96.clone(),
506            }),
507        }
508    }
509}
510
511impl From<ParquetColumnOptionsProto> for ParquetColumnOptions {
512    fn from(proto: ParquetColumnOptionsProto) -> Self {
513        #[allow(deprecated)] // max_statistics_size
514        ParquetColumnOptions {
515            bloom_filter_enabled: proto.bloom_filter_enabled_opt.map(
516                |parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v)| v,
517            ),
518            encoding: proto
519                .encoding_opt
520                .map(|parquet_column_options::EncodingOpt::Encoding(v)| v),
521            dictionary_enabled: proto.dictionary_enabled_opt.map(
522                |parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(v)| v,
523            ),
524            compression: proto
525                .compression_opt
526                .map(|parquet_column_options::CompressionOpt::Compression(v)| v),
527            statistics_enabled: proto.statistics_enabled_opt.map(
528                |parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(v)| v,
529            ),
530            bloom_filter_fpp: proto
531                .bloom_filter_fpp_opt
532                .map(|parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v)| v),
533            bloom_filter_ndv: proto
534                .bloom_filter_ndv_opt
535                .map(|parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v)| v),
536        }
537    }
538}
539
540impl From<&TableParquetOptionsProto> for TableParquetOptions {
541    fn from(proto: &TableParquetOptionsProto) -> Self {
542        TableParquetOptions {
543            global: proto
544                .global
545                .as_ref()
546                .map(ParquetOptions::from)
547                .unwrap_or_default(),
548            column_specific_options: proto
549                .column_specific_options
550                .iter()
551                .map(|parquet_column_options| {
552                    (
553                        parquet_column_options.column_name.clone(),
554                        ParquetColumnOptions::from(
555                            parquet_column_options.options.clone().unwrap_or_default(),
556                        ),
557                    )
558                })
559                .collect(),
560            key_value_metadata: proto
561                .key_value_metadata
562                .iter()
563                .map(|(k, v)| (k.clone(), Some(v.clone())))
564                .collect(),
565            crypto: Default::default(),
566        }
567    }
568}
569
570#[derive(Debug)]
571pub struct ParquetLogicalExtensionCodec;
572
573// TODO! This is a placeholder for now and needs to be implemented for real.
574impl LogicalExtensionCodec for ParquetLogicalExtensionCodec {
575    fn try_decode(
576        &self,
577        _buf: &[u8],
578        _inputs: &[datafusion_expr::LogicalPlan],
579        _ctx: &SessionContext,
580    ) -> datafusion_common::Result<datafusion_expr::Extension> {
581        not_impl_err!("Method not implemented")
582    }
583
584    fn try_encode(
585        &self,
586        _node: &datafusion_expr::Extension,
587        _buf: &mut Vec<u8>,
588    ) -> datafusion_common::Result<()> {
589        not_impl_err!("Method not implemented")
590    }
591
592    fn try_decode_table_provider(
593        &self,
594        _buf: &[u8],
595        _table_ref: &TableReference,
596        _schema: arrow::datatypes::SchemaRef,
597        _ctx: &SessionContext,
598    ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
599        not_impl_err!("Method not implemented")
600    }
601
602    fn try_encode_table_provider(
603        &self,
604        _table_ref: &TableReference,
605        _node: Arc<dyn datafusion::datasource::TableProvider>,
606        _buf: &mut Vec<u8>,
607    ) -> datafusion_common::Result<()> {
608        not_impl_err!("Method not implemented")
609    }
610
611    fn try_decode_file_format(
612        &self,
613        buf: &[u8],
614        _ctx: &SessionContext,
615    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
616        let proto = TableParquetOptionsProto::decode(buf).map_err(|e| {
617            DataFusionError::Execution(format!(
618                "Failed to decode TableParquetOptionsProto: {e:?}"
619            ))
620        })?;
621        let options: TableParquetOptions = (&proto).into();
622        Ok(Arc::new(ParquetFormatFactory {
623            options: Some(options),
624        }))
625    }
626
627    fn try_encode_file_format(
628        &self,
629        buf: &mut Vec<u8>,
630        node: Arc<dyn FileFormatFactory>,
631    ) -> datafusion_common::Result<()> {
632        let options = if let Some(parquet_factory) =
633            node.as_any().downcast_ref::<ParquetFormatFactory>()
634        {
635            parquet_factory.options.clone().unwrap_or_default()
636        } else {
637            return Err(DataFusionError::Execution(
638                "Unsupported FileFormatFactory type".to_string(),
639            ));
640        };
641
642        let proto = TableParquetOptionsProto::from_factory(&ParquetFormatFactory {
643            options: Some(options),
644        });
645
646        proto.encode(buf).map_err(|e| {
647            DataFusionError::Execution(format!(
648                "Failed to encode TableParquetOptionsProto: {e:?}"
649            ))
650        })?;
651
652        Ok(())
653    }
654}
655
656#[derive(Debug)]
657pub struct ArrowLogicalExtensionCodec;
658
659// TODO! This is a placeholder for now and needs to be implemented for real.
660impl LogicalExtensionCodec for ArrowLogicalExtensionCodec {
661    fn try_decode(
662        &self,
663        _buf: &[u8],
664        _inputs: &[datafusion_expr::LogicalPlan],
665        _ctx: &SessionContext,
666    ) -> datafusion_common::Result<datafusion_expr::Extension> {
667        not_impl_err!("Method not implemented")
668    }
669
670    fn try_encode(
671        &self,
672        _node: &datafusion_expr::Extension,
673        _buf: &mut Vec<u8>,
674    ) -> datafusion_common::Result<()> {
675        not_impl_err!("Method not implemented")
676    }
677
678    fn try_decode_table_provider(
679        &self,
680        _buf: &[u8],
681        _table_ref: &TableReference,
682        _schema: arrow::datatypes::SchemaRef,
683        _ctx: &SessionContext,
684    ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
685        not_impl_err!("Method not implemented")
686    }
687
688    fn try_encode_table_provider(
689        &self,
690        _table_ref: &TableReference,
691        _node: Arc<dyn datafusion::datasource::TableProvider>,
692        _buf: &mut Vec<u8>,
693    ) -> datafusion_common::Result<()> {
694        not_impl_err!("Method not implemented")
695    }
696
697    fn try_decode_file_format(
698        &self,
699        __buf: &[u8],
700        __ctx: &SessionContext,
701    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
702        Ok(Arc::new(ArrowFormatFactory::new()))
703    }
704
705    fn try_encode_file_format(
706        &self,
707        __buf: &mut Vec<u8>,
708        __node: Arc<dyn FileFormatFactory>,
709    ) -> datafusion_common::Result<()> {
710        Ok(())
711    }
712}
713
714#[derive(Debug)]
715pub struct AvroLogicalExtensionCodec;
716
717// TODO! This is a placeholder for now and needs to be implemented for real.
718impl LogicalExtensionCodec for AvroLogicalExtensionCodec {
719    fn try_decode(
720        &self,
721        _buf: &[u8],
722        _inputs: &[datafusion_expr::LogicalPlan],
723        _ctx: &SessionContext,
724    ) -> datafusion_common::Result<datafusion_expr::Extension> {
725        not_impl_err!("Method not implemented")
726    }
727
728    fn try_encode(
729        &self,
730        _node: &datafusion_expr::Extension,
731        _buf: &mut Vec<u8>,
732    ) -> datafusion_common::Result<()> {
733        not_impl_err!("Method not implemented")
734    }
735
736    fn try_decode_table_provider(
737        &self,
738        _buf: &[u8],
739        _table_ref: &TableReference,
740        _schema: arrow::datatypes::SchemaRef,
741        _cts: &SessionContext,
742    ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
743        not_impl_err!("Method not implemented")
744    }
745
746    fn try_encode_table_provider(
747        &self,
748        _table_ref: &TableReference,
749        _node: Arc<dyn datafusion::datasource::TableProvider>,
750        _buf: &mut Vec<u8>,
751    ) -> datafusion_common::Result<()> {
752        not_impl_err!("Method not implemented")
753    }
754
755    fn try_decode_file_format(
756        &self,
757        __buf: &[u8],
758        __ctx: &SessionContext,
759    ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
760        Ok(Arc::new(ArrowFormatFactory::new()))
761    }
762
763    fn try_encode_file_format(
764        &self,
765        __buf: &mut Vec<u8>,
766        __node: Arc<dyn FileFormatFactory>,
767    ) -> datafusion_common::Result<()> {
768        Ok(())
769    }
770}