1use std::sync::Arc;
19
20use datafusion::{
21 config::{
22 CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
23 TableParquetOptions,
24 },
25 datasource::file_format::{
26 arrow::ArrowFormatFactory, csv::CsvFormatFactory, json::JsonFormatFactory,
27 parquet::ParquetFormatFactory, FileFormatFactory,
28 },
29 prelude::SessionContext,
30};
31use datafusion_common::{
32 exec_err, not_impl_err, parsers::CompressionTypeVariant, DataFusionError,
33 TableReference,
34};
35use prost::Message;
36
37use crate::protobuf::{
38 parquet_column_options, parquet_options, CsvOptions as CsvOptionsProto,
39 JsonOptions as JsonOptionsProto, ParquetColumnOptions as ParquetColumnOptionsProto,
40 ParquetColumnSpecificOptions, ParquetOptions as ParquetOptionsProto,
41 TableParquetOptions as TableParquetOptionsProto,
42};
43
44use super::LogicalExtensionCodec;
45
46#[derive(Debug)]
47pub struct CsvLogicalExtensionCodec;
48
49impl CsvOptionsProto {
50 fn from_factory(factory: &CsvFormatFactory) -> Self {
51 if let Some(options) = &factory.options {
52 CsvOptionsProto {
53 has_header: options.has_header.map_or(vec![], |v| vec![v as u8]),
54 delimiter: vec![options.delimiter],
55 quote: vec![options.quote],
56 terminator: options.terminator.map_or(vec![], |v| vec![v]),
57 escape: options.escape.map_or(vec![], |v| vec![v]),
58 double_quote: options.double_quote.map_or(vec![], |v| vec![v as u8]),
59 compression: options.compression as i32,
60 schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
61 date_format: options.date_format.clone().unwrap_or_default(),
62 datetime_format: options.datetime_format.clone().unwrap_or_default(),
63 timestamp_format: options.timestamp_format.clone().unwrap_or_default(),
64 timestamp_tz_format: options
65 .timestamp_tz_format
66 .clone()
67 .unwrap_or_default(),
68 time_format: options.time_format.clone().unwrap_or_default(),
69 null_value: options.null_value.clone().unwrap_or_default(),
70 null_regex: options.null_regex.clone().unwrap_or_default(),
71 comment: options.comment.map_or(vec![], |v| vec![v]),
72 newlines_in_values: options
73 .newlines_in_values
74 .map_or(vec![], |v| vec![v as u8]),
75 }
76 } else {
77 CsvOptionsProto::default()
78 }
79 }
80}
81
82impl From<&CsvOptionsProto> for CsvOptions {
83 fn from(proto: &CsvOptionsProto) -> Self {
84 CsvOptions {
85 has_header: if !proto.has_header.is_empty() {
86 Some(proto.has_header[0] != 0)
87 } else {
88 None
89 },
90 delimiter: proto.delimiter.first().copied().unwrap_or(b','),
91 quote: proto.quote.first().copied().unwrap_or(b'"'),
92 terminator: if !proto.terminator.is_empty() {
93 Some(proto.terminator[0])
94 } else {
95 None
96 },
97 escape: if !proto.escape.is_empty() {
98 Some(proto.escape[0])
99 } else {
100 None
101 },
102 double_quote: if !proto.double_quote.is_empty() {
103 Some(proto.double_quote[0] != 0)
104 } else {
105 None
106 },
107 compression: match proto.compression {
108 0 => CompressionTypeVariant::GZIP,
109 1 => CompressionTypeVariant::BZIP2,
110 2 => CompressionTypeVariant::XZ,
111 3 => CompressionTypeVariant::ZSTD,
112 _ => CompressionTypeVariant::UNCOMPRESSED,
113 },
114 schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
115 date_format: if proto.date_format.is_empty() {
116 None
117 } else {
118 Some(proto.date_format.clone())
119 },
120 datetime_format: if proto.datetime_format.is_empty() {
121 None
122 } else {
123 Some(proto.datetime_format.clone())
124 },
125 timestamp_format: if proto.timestamp_format.is_empty() {
126 None
127 } else {
128 Some(proto.timestamp_format.clone())
129 },
130 timestamp_tz_format: if proto.timestamp_tz_format.is_empty() {
131 None
132 } else {
133 Some(proto.timestamp_tz_format.clone())
134 },
135 time_format: if proto.time_format.is_empty() {
136 None
137 } else {
138 Some(proto.time_format.clone())
139 },
140 null_value: if proto.null_value.is_empty() {
141 None
142 } else {
143 Some(proto.null_value.clone())
144 },
145 null_regex: if proto.null_regex.is_empty() {
146 None
147 } else {
148 Some(proto.null_regex.clone())
149 },
150 comment: if !proto.comment.is_empty() {
151 Some(proto.comment[0])
152 } else {
153 None
154 },
155 newlines_in_values: if proto.newlines_in_values.is_empty() {
156 None
157 } else {
158 Some(proto.newlines_in_values[0] != 0)
159 },
160 }
161 }
162}
163
164impl LogicalExtensionCodec for CsvLogicalExtensionCodec {
166 fn try_decode(
167 &self,
168 _buf: &[u8],
169 _inputs: &[datafusion_expr::LogicalPlan],
170 _ctx: &SessionContext,
171 ) -> datafusion_common::Result<datafusion_expr::Extension> {
172 not_impl_err!("Method not implemented")
173 }
174
175 fn try_encode(
176 &self,
177 _node: &datafusion_expr::Extension,
178 _buf: &mut Vec<u8>,
179 ) -> datafusion_common::Result<()> {
180 not_impl_err!("Method not implemented")
181 }
182
183 fn try_decode_table_provider(
184 &self,
185 _buf: &[u8],
186 _table_ref: &TableReference,
187 _schema: arrow::datatypes::SchemaRef,
188 _ctx: &SessionContext,
189 ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
190 not_impl_err!("Method not implemented")
191 }
192
193 fn try_encode_table_provider(
194 &self,
195 _table_ref: &TableReference,
196 _node: Arc<dyn datafusion::datasource::TableProvider>,
197 _buf: &mut Vec<u8>,
198 ) -> datafusion_common::Result<()> {
199 not_impl_err!("Method not implemented")
200 }
201
202 fn try_decode_file_format(
203 &self,
204 buf: &[u8],
205 _ctx: &SessionContext,
206 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
207 let proto = CsvOptionsProto::decode(buf).map_err(|e| {
208 DataFusionError::Execution(format!(
209 "Failed to decode CsvOptionsProto: {:?}",
210 e
211 ))
212 })?;
213 let options: CsvOptions = (&proto).into();
214 Ok(Arc::new(CsvFormatFactory {
215 options: Some(options),
216 }))
217 }
218
219 fn try_encode_file_format(
220 &self,
221 buf: &mut Vec<u8>,
222 node: Arc<dyn FileFormatFactory>,
223 ) -> datafusion_common::Result<()> {
224 let options =
225 if let Some(csv_factory) = node.as_any().downcast_ref::<CsvFormatFactory>() {
226 csv_factory.options.clone().unwrap_or_default()
227 } else {
228 return exec_err!("{}", "Unsupported FileFormatFactory type".to_string());
229 };
230
231 let proto = CsvOptionsProto::from_factory(&CsvFormatFactory {
232 options: Some(options),
233 });
234
235 proto.encode(buf).map_err(|e| {
236 DataFusionError::Execution(format!("Failed to encode CsvOptions: {:?}", e))
237 })?;
238
239 Ok(())
240 }
241}
242
243impl JsonOptionsProto {
244 fn from_factory(factory: &JsonFormatFactory) -> Self {
245 if let Some(options) = &factory.options {
246 JsonOptionsProto {
247 compression: options.compression as i32,
248 schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
249 }
250 } else {
251 JsonOptionsProto::default()
252 }
253 }
254}
255
256impl From<&JsonOptionsProto> for JsonOptions {
257 fn from(proto: &JsonOptionsProto) -> Self {
258 JsonOptions {
259 compression: match proto.compression {
260 0 => CompressionTypeVariant::GZIP,
261 1 => CompressionTypeVariant::BZIP2,
262 2 => CompressionTypeVariant::XZ,
263 3 => CompressionTypeVariant::ZSTD,
264 _ => CompressionTypeVariant::UNCOMPRESSED,
265 },
266 schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
267 }
268 }
269}
270
271#[derive(Debug)]
272pub struct JsonLogicalExtensionCodec;
273
274impl LogicalExtensionCodec for JsonLogicalExtensionCodec {
276 fn try_decode(
277 &self,
278 _buf: &[u8],
279 _inputs: &[datafusion_expr::LogicalPlan],
280 _ctx: &SessionContext,
281 ) -> datafusion_common::Result<datafusion_expr::Extension> {
282 not_impl_err!("Method not implemented")
283 }
284
285 fn try_encode(
286 &self,
287 _node: &datafusion_expr::Extension,
288 _buf: &mut Vec<u8>,
289 ) -> datafusion_common::Result<()> {
290 not_impl_err!("Method not implemented")
291 }
292
293 fn try_decode_table_provider(
294 &self,
295 _buf: &[u8],
296 _table_ref: &TableReference,
297 _schema: arrow::datatypes::SchemaRef,
298 _ctx: &SessionContext,
299 ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
300 not_impl_err!("Method not implemented")
301 }
302
303 fn try_encode_table_provider(
304 &self,
305 _table_ref: &TableReference,
306 _node: Arc<dyn datafusion::datasource::TableProvider>,
307 _buf: &mut Vec<u8>,
308 ) -> datafusion_common::Result<()> {
309 not_impl_err!("Method not implemented")
310 }
311
312 fn try_decode_file_format(
313 &self,
314 buf: &[u8],
315 _ctx: &SessionContext,
316 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
317 let proto = JsonOptionsProto::decode(buf).map_err(|e| {
318 DataFusionError::Execution(format!(
319 "Failed to decode JsonOptionsProto: {:?}",
320 e
321 ))
322 })?;
323 let options: JsonOptions = (&proto).into();
324 Ok(Arc::new(JsonFormatFactory {
325 options: Some(options),
326 }))
327 }
328
329 fn try_encode_file_format(
330 &self,
331 buf: &mut Vec<u8>,
332 node: Arc<dyn FileFormatFactory>,
333 ) -> datafusion_common::Result<()> {
334 let options = if let Some(json_factory) =
335 node.as_any().downcast_ref::<JsonFormatFactory>()
336 {
337 json_factory.options.clone().unwrap_or_default()
338 } else {
339 return Err(DataFusionError::Execution(
340 "Unsupported FileFormatFactory type".to_string(),
341 ));
342 };
343
344 let proto = JsonOptionsProto::from_factory(&JsonFormatFactory {
345 options: Some(options),
346 });
347
348 proto.encode(buf).map_err(|e| {
349 DataFusionError::Execution(format!("Failed to encode JsonOptions: {:?}", e))
350 })?;
351
352 Ok(())
353 }
354}
355
356impl TableParquetOptionsProto {
357 fn from_factory(factory: &ParquetFormatFactory) -> Self {
358 let global_options = if let Some(ref options) = factory.options {
359 options.clone()
360 } else {
361 return TableParquetOptionsProto::default();
362 };
363
364 let column_specific_options = global_options.column_specific_options;
365 #[allow(deprecated)] TableParquetOptionsProto {
367 global: Some(ParquetOptionsProto {
368 enable_page_index: global_options.global.enable_page_index,
369 pruning: global_options.global.pruning,
370 skip_metadata: global_options.global.skip_metadata,
371 metadata_size_hint_opt: global_options.global.metadata_size_hint.map(|size| {
372 parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size as u64)
373 }),
374 pushdown_filters: global_options.global.pushdown_filters,
375 reorder_filters: global_options.global.reorder_filters,
376 data_pagesize_limit: global_options.global.data_pagesize_limit as u64,
377 write_batch_size: global_options.global.write_batch_size as u64,
378 writer_version: global_options.global.writer_version.clone(),
379 compression_opt: global_options.global.compression.map(|compression| {
380 parquet_options::CompressionOpt::Compression(compression)
381 }),
382 dictionary_enabled_opt: global_options.global.dictionary_enabled.map(|enabled| {
383 parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
384 }),
385 dictionary_page_size_limit: global_options.global.dictionary_page_size_limit as u64,
386 statistics_enabled_opt: global_options.global.statistics_enabled.map(|enabled| {
387 parquet_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
388 }),
389 max_statistics_size_opt: global_options.global.max_statistics_size.map(|size| {
390 parquet_options::MaxStatisticsSizeOpt::MaxStatisticsSize(size as u64)
391 }),
392 max_row_group_size: global_options.global.max_row_group_size as u64,
393 created_by: global_options.global.created_by.clone(),
394 column_index_truncate_length_opt: global_options.global.column_index_truncate_length.map(|length| {
395 parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length as u64)
396 }),
397 statistics_truncate_length_opt: global_options.global.statistics_truncate_length.map(|length| {
398 parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length as u64)
399 }),
400 data_page_row_count_limit: global_options.global.data_page_row_count_limit as u64,
401 encoding_opt: global_options.global.encoding.map(|encoding| {
402 parquet_options::EncodingOpt::Encoding(encoding)
403 }),
404 bloom_filter_on_read: global_options.global.bloom_filter_on_read,
405 bloom_filter_on_write: global_options.global.bloom_filter_on_write,
406 bloom_filter_fpp_opt: global_options.global.bloom_filter_fpp.map(|fpp| {
407 parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
408 }),
409 bloom_filter_ndv_opt: global_options.global.bloom_filter_ndv.map(|ndv| {
410 parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
411 }),
412 allow_single_file_parallelism: global_options.global.allow_single_file_parallelism,
413 maximum_parallel_row_group_writers: global_options.global.maximum_parallel_row_group_writers as u64,
414 maximum_buffered_record_batches_per_stream: global_options.global.maximum_buffered_record_batches_per_stream as u64,
415 schema_force_view_types: global_options.global.schema_force_view_types,
416 binary_as_string: global_options.global.binary_as_string,
417 skip_arrow_metadata: global_options.global.skip_arrow_metadata,
418 coerce_int96_opt: global_options.global.coerce_int96.map(|compression| {
419 parquet_options::CoerceInt96Opt::CoerceInt96(compression)
420 }),
421 }),
422 column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| {
423 ParquetColumnSpecificOptions {
424 column_name,
425 options: Some(ParquetColumnOptionsProto {
426 bloom_filter_enabled_opt: options.bloom_filter_enabled.map(|enabled| {
427 parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(enabled)
428 }),
429 encoding_opt: options.encoding.map(|encoding| {
430 parquet_column_options::EncodingOpt::Encoding(encoding)
431 }),
432 dictionary_enabled_opt: options.dictionary_enabled.map(|enabled| {
433 parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
434 }),
435 compression_opt: options.compression.map(|compression| {
436 parquet_column_options::CompressionOpt::Compression(compression)
437 }),
438 statistics_enabled_opt: options.statistics_enabled.map(|enabled| {
439 parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
440 }),
441 bloom_filter_fpp_opt: options.bloom_filter_fpp.map(|fpp| {
442 parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
443 }),
444 bloom_filter_ndv_opt: options.bloom_filter_ndv.map(|ndv| {
445 parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
446 }),
447 max_statistics_size_opt: options.max_statistics_size.map(|size| {
448 parquet_column_options::MaxStatisticsSizeOpt::MaxStatisticsSize(size as u32)
449 }),
450 })
451 }
452 }).collect(),
453 key_value_metadata: global_options.key_value_metadata
454 .iter()
455 .filter_map(|(key, value)| {
456 value.as_ref().map(|v| (key.clone(), v.clone()))
457 })
458 .collect(),
459 }
460 }
461}
462
463impl From<&ParquetOptionsProto> for ParquetOptions {
464 fn from(proto: &ParquetOptionsProto) -> Self {
465 #[allow(deprecated)] ParquetOptions {
467 enable_page_index: proto.enable_page_index,
468 pruning: proto.pruning,
469 skip_metadata: proto.skip_metadata,
470 metadata_size_hint: proto.metadata_size_hint_opt.as_ref().map(|opt| match opt {
471 parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size) => *size as usize,
472 }),
473 pushdown_filters: proto.pushdown_filters,
474 reorder_filters: proto.reorder_filters,
475 data_pagesize_limit: proto.data_pagesize_limit as usize,
476 write_batch_size: proto.write_batch_size as usize,
477 writer_version: proto.writer_version.clone(),
478 compression: proto.compression_opt.as_ref().map(|opt| match opt {
479 parquet_options::CompressionOpt::Compression(compression) => compression.clone(),
480 }),
481 dictionary_enabled: proto.dictionary_enabled_opt.as_ref().map(|opt| match opt {
482 parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled) => *enabled,
483 }),
484 dictionary_page_size_limit: proto.dictionary_page_size_limit as usize,
485 statistics_enabled: proto.statistics_enabled_opt.as_ref().map(|opt| match opt {
486 parquet_options::StatisticsEnabledOpt::StatisticsEnabled(statistics) => statistics.clone(),
487 }),
488 max_statistics_size: proto.max_statistics_size_opt.as_ref().map(|opt| match opt {
489 parquet_options::MaxStatisticsSizeOpt::MaxStatisticsSize(size) => *size as usize,
490 }),
491 max_row_group_size: proto.max_row_group_size as usize,
492 created_by: proto.created_by.clone(),
493 column_index_truncate_length: proto.column_index_truncate_length_opt.as_ref().map(|opt| match opt {
494 parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length) => *length as usize,
495 }),
496 statistics_truncate_length: proto.statistics_truncate_length_opt.as_ref().map(|opt| match opt {
497 parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length) => *length as usize,
498 }),
499 data_page_row_count_limit: proto.data_page_row_count_limit as usize,
500 encoding: proto.encoding_opt.as_ref().map(|opt| match opt {
501 parquet_options::EncodingOpt::Encoding(encoding) => encoding.clone(),
502 }),
503 bloom_filter_on_read: proto.bloom_filter_on_read,
504 bloom_filter_on_write: proto.bloom_filter_on_write,
505 bloom_filter_fpp: proto.bloom_filter_fpp_opt.as_ref().map(|opt| match opt {
506 parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp) => *fpp,
507 }),
508 bloom_filter_ndv: proto.bloom_filter_ndv_opt.as_ref().map(|opt| match opt {
509 parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv) => *ndv,
510 }),
511 allow_single_file_parallelism: proto.allow_single_file_parallelism,
512 maximum_parallel_row_group_writers: proto.maximum_parallel_row_group_writers as usize,
513 maximum_buffered_record_batches_per_stream: proto.maximum_buffered_record_batches_per_stream as usize,
514 schema_force_view_types: proto.schema_force_view_types,
515 binary_as_string: proto.binary_as_string,
516 skip_arrow_metadata: proto.skip_arrow_metadata,
517 coerce_int96: proto.coerce_int96_opt.as_ref().map(|opt| match opt {
518 parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => coerce_int96.clone(),
519 }),
520 }
521 }
522}
523
524impl From<ParquetColumnOptionsProto> for ParquetColumnOptions {
525 fn from(proto: ParquetColumnOptionsProto) -> Self {
526 #[allow(deprecated)] ParquetColumnOptions {
528 bloom_filter_enabled: proto.bloom_filter_enabled_opt.map(
529 |parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v)| v,
530 ),
531 encoding: proto
532 .encoding_opt
533 .map(|parquet_column_options::EncodingOpt::Encoding(v)| v),
534 dictionary_enabled: proto.dictionary_enabled_opt.map(
535 |parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(v)| v,
536 ),
537 compression: proto
538 .compression_opt
539 .map(|parquet_column_options::CompressionOpt::Compression(v)| v),
540 statistics_enabled: proto.statistics_enabled_opt.map(
541 |parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(v)| v,
542 ),
543 bloom_filter_fpp: proto
544 .bloom_filter_fpp_opt
545 .map(|parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v)| v),
546 bloom_filter_ndv: proto
547 .bloom_filter_ndv_opt
548 .map(|parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v)| v),
549 max_statistics_size: proto.max_statistics_size_opt.map(
550 |parquet_column_options::MaxStatisticsSizeOpt::MaxStatisticsSize(v)| {
551 v as usize
552 },
553 ),
554 }
555 }
556}
557
558impl From<&TableParquetOptionsProto> for TableParquetOptions {
559 fn from(proto: &TableParquetOptionsProto) -> Self {
560 TableParquetOptions {
561 global: proto
562 .global
563 .as_ref()
564 .map(ParquetOptions::from)
565 .unwrap_or_default(),
566 column_specific_options: proto
567 .column_specific_options
568 .iter()
569 .map(|parquet_column_options| {
570 (
571 parquet_column_options.column_name.clone(),
572 ParquetColumnOptions::from(
573 parquet_column_options.options.clone().unwrap_or_default(),
574 ),
575 )
576 })
577 .collect(),
578 key_value_metadata: proto
579 .key_value_metadata
580 .iter()
581 .map(|(k, v)| (k.clone(), Some(v.clone())))
582 .collect(),
583 }
584 }
585}
586
587#[derive(Debug)]
588pub struct ParquetLogicalExtensionCodec;
589
590impl LogicalExtensionCodec for ParquetLogicalExtensionCodec {
592 fn try_decode(
593 &self,
594 _buf: &[u8],
595 _inputs: &[datafusion_expr::LogicalPlan],
596 _ctx: &SessionContext,
597 ) -> datafusion_common::Result<datafusion_expr::Extension> {
598 not_impl_err!("Method not implemented")
599 }
600
601 fn try_encode(
602 &self,
603 _node: &datafusion_expr::Extension,
604 _buf: &mut Vec<u8>,
605 ) -> datafusion_common::Result<()> {
606 not_impl_err!("Method not implemented")
607 }
608
609 fn try_decode_table_provider(
610 &self,
611 _buf: &[u8],
612 _table_ref: &TableReference,
613 _schema: arrow::datatypes::SchemaRef,
614 _ctx: &SessionContext,
615 ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
616 not_impl_err!("Method not implemented")
617 }
618
619 fn try_encode_table_provider(
620 &self,
621 _table_ref: &TableReference,
622 _node: Arc<dyn datafusion::datasource::TableProvider>,
623 _buf: &mut Vec<u8>,
624 ) -> datafusion_common::Result<()> {
625 not_impl_err!("Method not implemented")
626 }
627
628 fn try_decode_file_format(
629 &self,
630 buf: &[u8],
631 _ctx: &SessionContext,
632 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
633 let proto = TableParquetOptionsProto::decode(buf).map_err(|e| {
634 DataFusionError::Execution(format!(
635 "Failed to decode TableParquetOptionsProto: {:?}",
636 e
637 ))
638 })?;
639 let options: TableParquetOptions = (&proto).into();
640 Ok(Arc::new(ParquetFormatFactory {
641 options: Some(options),
642 }))
643 }
644
645 fn try_encode_file_format(
646 &self,
647 buf: &mut Vec<u8>,
648 node: Arc<dyn FileFormatFactory>,
649 ) -> datafusion_common::Result<()> {
650 let options = if let Some(parquet_factory) =
651 node.as_any().downcast_ref::<ParquetFormatFactory>()
652 {
653 parquet_factory.options.clone().unwrap_or_default()
654 } else {
655 return Err(DataFusionError::Execution(
656 "Unsupported FileFormatFactory type".to_string(),
657 ));
658 };
659
660 let proto = TableParquetOptionsProto::from_factory(&ParquetFormatFactory {
661 options: Some(options),
662 });
663
664 proto.encode(buf).map_err(|e| {
665 DataFusionError::Execution(format!(
666 "Failed to encode TableParquetOptionsProto: {:?}",
667 e
668 ))
669 })?;
670
671 Ok(())
672 }
673}
674
675#[derive(Debug)]
676pub struct ArrowLogicalExtensionCodec;
677
678impl LogicalExtensionCodec for ArrowLogicalExtensionCodec {
680 fn try_decode(
681 &self,
682 _buf: &[u8],
683 _inputs: &[datafusion_expr::LogicalPlan],
684 _ctx: &SessionContext,
685 ) -> datafusion_common::Result<datafusion_expr::Extension> {
686 not_impl_err!("Method not implemented")
687 }
688
689 fn try_encode(
690 &self,
691 _node: &datafusion_expr::Extension,
692 _buf: &mut Vec<u8>,
693 ) -> datafusion_common::Result<()> {
694 not_impl_err!("Method not implemented")
695 }
696
697 fn try_decode_table_provider(
698 &self,
699 _buf: &[u8],
700 _table_ref: &TableReference,
701 _schema: arrow::datatypes::SchemaRef,
702 _ctx: &SessionContext,
703 ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
704 not_impl_err!("Method not implemented")
705 }
706
707 fn try_encode_table_provider(
708 &self,
709 _table_ref: &TableReference,
710 _node: Arc<dyn datafusion::datasource::TableProvider>,
711 _buf: &mut Vec<u8>,
712 ) -> datafusion_common::Result<()> {
713 not_impl_err!("Method not implemented")
714 }
715
716 fn try_decode_file_format(
717 &self,
718 __buf: &[u8],
719 __ctx: &SessionContext,
720 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
721 Ok(Arc::new(ArrowFormatFactory::new()))
722 }
723
724 fn try_encode_file_format(
725 &self,
726 __buf: &mut Vec<u8>,
727 __node: Arc<dyn FileFormatFactory>,
728 ) -> datafusion_common::Result<()> {
729 Ok(())
730 }
731}
732
733#[derive(Debug)]
734pub struct AvroLogicalExtensionCodec;
735
736impl LogicalExtensionCodec for AvroLogicalExtensionCodec {
738 fn try_decode(
739 &self,
740 _buf: &[u8],
741 _inputs: &[datafusion_expr::LogicalPlan],
742 _ctx: &SessionContext,
743 ) -> datafusion_common::Result<datafusion_expr::Extension> {
744 not_impl_err!("Method not implemented")
745 }
746
747 fn try_encode(
748 &self,
749 _node: &datafusion_expr::Extension,
750 _buf: &mut Vec<u8>,
751 ) -> datafusion_common::Result<()> {
752 not_impl_err!("Method not implemented")
753 }
754
755 fn try_decode_table_provider(
756 &self,
757 _buf: &[u8],
758 _table_ref: &TableReference,
759 _schema: arrow::datatypes::SchemaRef,
760 _cts: &SessionContext,
761 ) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
762 not_impl_err!("Method not implemented")
763 }
764
765 fn try_encode_table_provider(
766 &self,
767 _table_ref: &TableReference,
768 _node: Arc<dyn datafusion::datasource::TableProvider>,
769 _buf: &mut Vec<u8>,
770 ) -> datafusion_common::Result<()> {
771 not_impl_err!("Method not implemented")
772 }
773
774 fn try_decode_file_format(
775 &self,
776 __buf: &[u8],
777 __ctx: &SessionContext,
778 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
779 Ok(Arc::new(ArrowFormatFactory::new()))
780 }
781
782 fn try_encode_file_format(
783 &self,
784 __buf: &mut Vec<u8>,
785 __node: Arc<dyn FileFormatFactory>,
786 ) -> datafusion_common::Result<()> {
787 Ok(())
788 }
789}