1use std::sync::Arc;
19
20use super::LogicalExtensionCodec;
21use crate::protobuf::{
22 CsvOptions as CsvOptionsProto, CsvQuoteStyle as CsvQuoteStyleProto,
23 JsonOptions as JsonOptionsProto,
24};
25use datafusion_common::config::{CsvOptions, JsonOptions};
26use datafusion_common::{
27 TableReference, exec_datafusion_err, exec_err, not_impl_err,
28 parsers::{CompressionTypeVariant, CsvQuoteStyle},
29};
30use datafusion_datasource::file_format::FileFormatFactory;
31use datafusion_datasource_arrow::file_format::ArrowFormatFactory;
32use datafusion_datasource_csv::file_format::CsvFormatFactory;
33use datafusion_datasource_json::file_format::JsonFormatFactory;
34use datafusion_execution::TaskContext;
35use prost::Message;
36
37#[derive(Debug)]
38pub struct CsvLogicalExtensionCodec;
39
40impl CsvOptionsProto {
41 fn from_factory(factory: &CsvFormatFactory) -> Self {
42 if let Some(options) = &factory.options {
43 CsvOptionsProto {
44 has_header: options.has_header.map_or(vec![], |v| vec![v as u8]),
45 delimiter: vec![options.delimiter],
46 quote: vec![options.quote],
47 terminator: options.terminator.map_or(vec![], |v| vec![v]),
48 escape: options.escape.map_or(vec![], |v| vec![v]),
49 double_quote: options.double_quote.map_or(vec![], |v| vec![v as u8]),
50 compression: options.compression as i32,
51 schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
52 date_format: options.date_format.clone().unwrap_or_default(),
53 datetime_format: options.datetime_format.clone().unwrap_or_default(),
54 timestamp_format: options.timestamp_format.clone().unwrap_or_default(),
55 timestamp_tz_format: options
56 .timestamp_tz_format
57 .clone()
58 .unwrap_or_default(),
59 time_format: options.time_format.clone().unwrap_or_default(),
60 null_value: options.null_value.clone().unwrap_or_default(),
61 null_regex: options.null_regex.clone().unwrap_or_default(),
62 comment: options.comment.map_or(vec![], |v| vec![v]),
63 newlines_in_values: options
64 .newlines_in_values
65 .map_or(vec![], |v| vec![v as u8]),
66 truncated_rows: options.truncated_rows.map_or(vec![], |v| vec![v as u8]),
67 compression_level: options.compression_level,
68 quote_style: options.quote_style as i32,
69 ignore_leading_whitespace: options
70 .ignore_leading_whitespace
71 .map_or(vec![], |v| vec![v as u8]),
72 ignore_trailing_whitespace: options
73 .ignore_trailing_whitespace
74 .map_or(vec![], |v| vec![v as u8]),
75 }
76 } else {
77 CsvOptionsProto::default()
78 }
79 }
80}
81
82impl From<&CsvOptionsProto> for CsvOptions {
83 fn from(proto: &CsvOptionsProto) -> Self {
84 CsvOptions {
85 has_header: if !proto.has_header.is_empty() {
86 Some(proto.has_header[0] != 0)
87 } else {
88 None
89 },
90 delimiter: proto.delimiter.first().copied().unwrap_or(b','),
91 quote: proto.quote.first().copied().unwrap_or(b'"'),
92 terminator: if !proto.terminator.is_empty() {
93 Some(proto.terminator[0])
94 } else {
95 None
96 },
97 escape: if !proto.escape.is_empty() {
98 Some(proto.escape[0])
99 } else {
100 None
101 },
102 double_quote: if !proto.double_quote.is_empty() {
103 Some(proto.double_quote[0] != 0)
104 } else {
105 None
106 },
107 compression: match proto.compression {
108 0 => CompressionTypeVariant::GZIP,
109 1 => CompressionTypeVariant::BZIP2,
110 2 => CompressionTypeVariant::XZ,
111 3 => CompressionTypeVariant::ZSTD,
112 _ => CompressionTypeVariant::UNCOMPRESSED,
113 },
114 schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
115 date_format: if proto.date_format.is_empty() {
116 None
117 } else {
118 Some(proto.date_format.clone())
119 },
120 datetime_format: if proto.datetime_format.is_empty() {
121 None
122 } else {
123 Some(proto.datetime_format.clone())
124 },
125 timestamp_format: if proto.timestamp_format.is_empty() {
126 None
127 } else {
128 Some(proto.timestamp_format.clone())
129 },
130 timestamp_tz_format: if proto.timestamp_tz_format.is_empty() {
131 None
132 } else {
133 Some(proto.timestamp_tz_format.clone())
134 },
135 time_format: if proto.time_format.is_empty() {
136 None
137 } else {
138 Some(proto.time_format.clone())
139 },
140 null_value: if proto.null_value.is_empty() {
141 None
142 } else {
143 Some(proto.null_value.clone())
144 },
145 null_regex: if proto.null_regex.is_empty() {
146 None
147 } else {
148 Some(proto.null_regex.clone())
149 },
150 comment: if !proto.comment.is_empty() {
151 Some(proto.comment[0])
152 } else {
153 None
154 },
155 newlines_in_values: if proto.newlines_in_values.is_empty() {
156 None
157 } else {
158 Some(proto.newlines_in_values[0] != 0)
159 },
160 truncated_rows: if proto.truncated_rows.is_empty() {
161 None
162 } else {
163 Some(proto.truncated_rows[0] != 0)
164 },
165 compression_level: proto.compression_level,
166 quote_style: match CsvQuoteStyleProto::try_from(proto.quote_style) {
167 Ok(CsvQuoteStyleProto::Always) => CsvQuoteStyle::Always,
168 Ok(CsvQuoteStyleProto::NonNumeric) => CsvQuoteStyle::NonNumeric,
169 Ok(CsvQuoteStyleProto::Never) => CsvQuoteStyle::Never,
170 Ok(CsvQuoteStyleProto::Necessary) => CsvQuoteStyle::Necessary,
171 _ => CsvQuoteStyle::Necessary,
172 },
173 ignore_leading_whitespace: if proto.ignore_leading_whitespace.is_empty() {
174 None
175 } else {
176 Some(proto.ignore_leading_whitespace[0] != 0)
177 },
178 ignore_trailing_whitespace: if proto.ignore_trailing_whitespace.is_empty() {
179 None
180 } else {
181 Some(proto.ignore_trailing_whitespace[0] != 0)
182 },
183 }
184 }
185}
186
187impl LogicalExtensionCodec for CsvLogicalExtensionCodec {
189 fn try_decode(
190 &self,
191 _buf: &[u8],
192 _inputs: &[datafusion_expr::LogicalPlan],
193 _ctx: &TaskContext,
194 ) -> datafusion_common::Result<datafusion_expr::Extension> {
195 not_impl_err!("Method not implemented")
196 }
197
198 fn try_encode(
199 &self,
200 _node: &datafusion_expr::Extension,
201 _buf: &mut Vec<u8>,
202 ) -> datafusion_common::Result<()> {
203 not_impl_err!("Method not implemented")
204 }
205
206 fn try_decode_table_provider(
207 &self,
208 _buf: &[u8],
209 _table_ref: &TableReference,
210 _schema: arrow::datatypes::SchemaRef,
211 _ctx: &TaskContext,
212 ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>> {
213 not_impl_err!("Method not implemented")
214 }
215
216 fn try_encode_table_provider(
217 &self,
218 _table_ref: &TableReference,
219 _node: Arc<dyn datafusion_catalog::TableProvider>,
220 _buf: &mut Vec<u8>,
221 ) -> datafusion_common::Result<()> {
222 not_impl_err!("Method not implemented")
223 }
224
225 fn try_decode_file_format(
226 &self,
227 buf: &[u8],
228 _ctx: &TaskContext,
229 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
230 let proto = CsvOptionsProto::decode(buf).map_err(|e| {
231 exec_datafusion_err!("Failed to decode CsvOptionsProto: {e:?}")
232 })?;
233 let options: CsvOptions = (&proto).into();
234 Ok(Arc::new(CsvFormatFactory {
235 options: Some(options),
236 }))
237 }
238
239 fn try_encode_file_format(
240 &self,
241 buf: &mut Vec<u8>,
242 node: Arc<dyn FileFormatFactory>,
243 ) -> datafusion_common::Result<()> {
244 let options = if let Some(csv_factory) = node.downcast_ref::<CsvFormatFactory>() {
245 csv_factory.options.clone().unwrap_or_default()
246 } else {
247 return exec_err!("{}", "Unsupported FileFormatFactory type".to_string());
248 };
249
250 let proto = CsvOptionsProto::from_factory(&CsvFormatFactory {
251 options: Some(options),
252 });
253
254 proto
255 .encode(buf)
256 .map_err(|e| exec_datafusion_err!("Failed to encode CsvOptions: {e:?}"))?;
257
258 Ok(())
259 }
260}
261
262impl JsonOptionsProto {
263 fn from_factory(factory: &JsonFormatFactory) -> Self {
264 if let Some(options) = &factory.options {
265 JsonOptionsProto {
266 compression: options.compression as i32,
267 schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
268 compression_level: options.compression_level,
269 newline_delimited: Some(options.newline_delimited),
270 }
271 } else {
272 JsonOptionsProto::default()
273 }
274 }
275}
276
277impl From<&JsonOptionsProto> for JsonOptions {
278 fn from(proto: &JsonOptionsProto) -> Self {
279 JsonOptions {
280 compression: match proto.compression {
281 0 => CompressionTypeVariant::GZIP,
282 1 => CompressionTypeVariant::BZIP2,
283 2 => CompressionTypeVariant::XZ,
284 3 => CompressionTypeVariant::ZSTD,
285 _ => CompressionTypeVariant::UNCOMPRESSED,
286 },
287 schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
288 compression_level: proto.compression_level,
289 newline_delimited: proto.newline_delimited.unwrap_or(true),
290 }
291 }
292}
293
294#[derive(Debug)]
295pub struct JsonLogicalExtensionCodec;
296
297impl LogicalExtensionCodec for JsonLogicalExtensionCodec {
299 fn try_decode(
300 &self,
301 _buf: &[u8],
302 _inputs: &[datafusion_expr::LogicalPlan],
303 _ctx: &TaskContext,
304 ) -> datafusion_common::Result<datafusion_expr::Extension> {
305 not_impl_err!("Method not implemented")
306 }
307
308 fn try_encode(
309 &self,
310 _node: &datafusion_expr::Extension,
311 _buf: &mut Vec<u8>,
312 ) -> datafusion_common::Result<()> {
313 not_impl_err!("Method not implemented")
314 }
315
316 fn try_decode_table_provider(
317 &self,
318 _buf: &[u8],
319 _table_ref: &TableReference,
320 _schema: arrow::datatypes::SchemaRef,
321 _ctx: &TaskContext,
322 ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>> {
323 not_impl_err!("Method not implemented")
324 }
325
326 fn try_encode_table_provider(
327 &self,
328 _table_ref: &TableReference,
329 _node: Arc<dyn datafusion_catalog::TableProvider>,
330 _buf: &mut Vec<u8>,
331 ) -> datafusion_common::Result<()> {
332 not_impl_err!("Method not implemented")
333 }
334
335 fn try_decode_file_format(
336 &self,
337 buf: &[u8],
338 _ctx: &TaskContext,
339 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
340 let proto = JsonOptionsProto::decode(buf).map_err(|e| {
341 exec_datafusion_err!("Failed to decode JsonOptionsProto: {e:?}")
342 })?;
343 let options: JsonOptions = (&proto).into();
344 Ok(Arc::new(JsonFormatFactory {
345 options: Some(options),
346 }))
347 }
348
349 fn try_encode_file_format(
350 &self,
351 buf: &mut Vec<u8>,
352 node: Arc<dyn FileFormatFactory>,
353 ) -> datafusion_common::Result<()> {
354 let options = if let Some(json_factory) = node.downcast_ref::<JsonFormatFactory>()
355 {
356 json_factory.options.clone().unwrap_or_default()
357 } else {
358 return exec_err!("Unsupported FileFormatFactory type");
359 };
360
361 let proto = JsonOptionsProto::from_factory(&JsonFormatFactory {
362 options: Some(options),
363 });
364
365 proto
366 .encode(buf)
367 .map_err(|e| exec_datafusion_err!("Failed to encode JsonOptions: {e:?}"))?;
368
369 Ok(())
370 }
371}
372
373#[cfg(feature = "parquet")]
374mod parquet {
375 use super::*;
376
377 use crate::protobuf::{
378 ParquetCdcOptions as ParquetCdcOptionsProto,
379 ParquetColumnOptions as ParquetColumnOptionsProto, ParquetColumnSpecificOptions,
380 ParquetOptions as ParquetOptionsProto,
381 TableParquetOptions as TableParquetOptionsProto, parquet_column_options,
382 parquet_options,
383 };
384 use datafusion_common::config::{
385 ParquetCdcOptions, ParquetColumnOptions, ParquetOptions, TableParquetOptions,
386 };
387 use datafusion_datasource_parquet::file_format::ParquetFormatFactory;
388
389 impl TableParquetOptionsProto {
390 fn from_factory(factory: &ParquetFormatFactory) -> Self {
391 let global_options = if let Some(ref options) = factory.options {
392 options.clone()
393 } else {
394 return TableParquetOptionsProto::default();
395 };
396
397 let column_specific_options = global_options.column_specific_options;
398 TableParquetOptionsProto {
399 global: Some(ParquetOptionsProto {
400 enable_page_index: global_options.global.enable_page_index,
401 pruning: global_options.global.pruning,
402 skip_metadata: global_options.global.skip_metadata,
403 metadata_size_hint_opt: global_options.global.metadata_size_hint.map(|size| {
404 parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size as u64)
405 }),
406 pushdown_filters: global_options.global.pushdown_filters,
407 reorder_filters: global_options.global.reorder_filters,
408 force_filter_selections: global_options.global.force_filter_selections,
409 data_pagesize_limit: global_options.global.data_pagesize_limit as u64,
410 write_batch_size: global_options.global.write_batch_size as u64,
411 writer_version: global_options.global.writer_version.to_string(),
412 compression_opt: global_options.global.compression.map(|compression| {
413 parquet_options::CompressionOpt::Compression(compression)
414 }),
415 dictionary_enabled_opt: global_options.global.dictionary_enabled.map(|enabled| {
416 parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
417 }),
418 dictionary_page_size_limit: global_options.global.dictionary_page_size_limit as u64,
419 statistics_enabled_opt: global_options.global.statistics_enabled.map(|enabled| {
420 parquet_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
421 }),
422 max_row_group_size: global_options.global.max_row_group_size as u64,
423 created_by: global_options.global.created_by.clone(),
424 column_index_truncate_length_opt: global_options.global.column_index_truncate_length.map(|length| {
425 parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length as u64)
426 }),
427 statistics_truncate_length_opt: global_options.global.statistics_truncate_length.map(|length| {
428 parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length as u64)
429 }),
430 data_page_row_count_limit: global_options.global.data_page_row_count_limit as u64,
431 encoding_opt: global_options.global.encoding.map(|encoding| {
432 parquet_options::EncodingOpt::Encoding(encoding)
433 }),
434 bloom_filter_on_read: global_options.global.bloom_filter_on_read,
435 bloom_filter_on_write: global_options.global.bloom_filter_on_write,
436 bloom_filter_fpp_opt: global_options.global.bloom_filter_fpp.map(|fpp| {
437 parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
438 }),
439 bloom_filter_ndv_opt: global_options.global.bloom_filter_ndv.map(|ndv| {
440 parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
441 }),
442 allow_single_file_parallelism: global_options.global.allow_single_file_parallelism,
443 maximum_parallel_row_group_writers: global_options.global.maximum_parallel_row_group_writers as u64,
444 maximum_buffered_record_batches_per_stream: global_options.global.maximum_buffered_record_batches_per_stream as u64,
445 schema_force_view_types: global_options.global.schema_force_view_types,
446 binary_as_string: global_options.global.binary_as_string,
447 skip_arrow_metadata: global_options.global.skip_arrow_metadata,
448 coerce_int96_opt: global_options.global.coerce_int96.map(|compression| {
449 parquet_options::CoerceInt96Opt::CoerceInt96(compression)
450 }),
451 coerce_int96_tz_opt: global_options.global.coerce_int96_tz.map(|tz| {
452 parquet_options::CoerceInt96TzOpt::CoerceInt96Tz(tz)
453 }),
454 max_predicate_cache_size_opt: global_options.global.max_predicate_cache_size.map(|size| {
455 parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size as u64)
456 }),
457 content_defined_chunking: Some(ParquetCdcOptionsProto {
458 enabled: global_options.global.content_defined_chunking.enabled,
459 min_chunk_size: global_options.global.content_defined_chunking.min_chunk_size as u64,
460 max_chunk_size: global_options.global.content_defined_chunking.max_chunk_size as u64,
461 norm_level: global_options.global.content_defined_chunking.norm_level,
462 }),
463 }),
464 column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| {
465 ParquetColumnSpecificOptions {
466 column_name,
467 options: Some(ParquetColumnOptionsProto {
468 bloom_filter_enabled_opt: options.bloom_filter_enabled.map(|enabled| {
469 parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(enabled)
470 }),
471 encoding_opt: options.encoding.map(|encoding| {
472 parquet_column_options::EncodingOpt::Encoding(encoding)
473 }),
474 dictionary_enabled_opt: options.dictionary_enabled.map(|enabled| {
475 parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(enabled)
476 }),
477 compression_opt: options.compression.map(|compression| {
478 parquet_column_options::CompressionOpt::Compression(compression)
479 }),
480 statistics_enabled_opt: options.statistics_enabled.map(|enabled| {
481 parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(enabled)
482 }),
483 bloom_filter_fpp_opt: options.bloom_filter_fpp.map(|fpp| {
484 parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(fpp)
485 }),
486 bloom_filter_ndv_opt: options.bloom_filter_ndv.map(|ndv| {
487 parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(ndv)
488 }),
489 })
490 }
491 }).collect(),
492 key_value_metadata: global_options.key_value_metadata
493 .iter()
494 .filter_map(|(key, value)| {
495 value.as_ref().map(|v| (key.clone(), v.clone()))
496 })
497 .collect(),
498 }
499 }
500 }
501
502 impl From<&ParquetOptionsProto> for ParquetOptions {
503 fn from(proto: &ParquetOptionsProto) -> Self {
504 ParquetOptions {
505 enable_page_index: proto.enable_page_index,
506 pruning: proto.pruning,
507 skip_metadata: proto.skip_metadata,
508 metadata_size_hint: proto.metadata_size_hint_opt.as_ref().map(|opt| match opt {
509 parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size) => *size as usize,
510 }),
511 pushdown_filters: proto.pushdown_filters,
512 reorder_filters: proto.reorder_filters,
513 force_filter_selections: proto.force_filter_selections,
514 data_pagesize_limit: proto.data_pagesize_limit as usize,
515 write_batch_size: proto.write_batch_size as usize,
516 writer_version: proto.writer_version.parse().expect("
518 Invalid parquet writer version in proto, expected '1.0' or '2.0'
519 "),
520 compression: proto.compression_opt.as_ref().map(|opt| match opt {
521 parquet_options::CompressionOpt::Compression(compression) => compression.clone(),
522 }),
523 dictionary_enabled: proto.dictionary_enabled_opt.as_ref().map(|opt| match opt {
524 parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled) => *enabled,
525 }),
526 dictionary_page_size_limit: proto.dictionary_page_size_limit as usize,
527 statistics_enabled: proto.statistics_enabled_opt.as_ref().map(|opt| match opt {
528 parquet_options::StatisticsEnabledOpt::StatisticsEnabled(statistics) => statistics.clone(),
529 }),
530 max_row_group_size: proto.max_row_group_size as usize,
531 created_by: proto.created_by.clone(),
532 column_index_truncate_length: proto.column_index_truncate_length_opt.as_ref().map(|opt| match opt {
533 parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length) => *length as usize,
534 }),
535 statistics_truncate_length: proto.statistics_truncate_length_opt.as_ref().map(|opt| match opt {
536 parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length) => *length as usize,
537 }),
538 data_page_row_count_limit: proto.data_page_row_count_limit as usize,
539 encoding: proto.encoding_opt.as_ref().map(|opt| match opt {
540 parquet_options::EncodingOpt::Encoding(encoding) => encoding.clone(),
541 }),
542 bloom_filter_on_read: proto.bloom_filter_on_read,
543 bloom_filter_on_write: proto.bloom_filter_on_write,
544 bloom_filter_fpp: proto.bloom_filter_fpp_opt.as_ref().map(|opt| match opt {
545 parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp) => *fpp,
546 }),
547 bloom_filter_ndv: proto.bloom_filter_ndv_opt.as_ref().map(|opt| match opt {
548 parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv) => *ndv,
549 }),
550 allow_single_file_parallelism: proto.allow_single_file_parallelism,
551 maximum_parallel_row_group_writers: proto.maximum_parallel_row_group_writers as usize,
552 maximum_buffered_record_batches_per_stream: proto.maximum_buffered_record_batches_per_stream as usize,
553 schema_force_view_types: proto.schema_force_view_types,
554 binary_as_string: proto.binary_as_string,
555 skip_arrow_metadata: proto.skip_arrow_metadata,
556 coerce_int96: proto.coerce_int96_opt.as_ref().map(|opt| match opt {
557 parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => coerce_int96.clone(),
558 }),
559 coerce_int96_tz: proto.coerce_int96_tz_opt.as_ref().map(|opt| match opt {
560 parquet_options::CoerceInt96TzOpt::CoerceInt96Tz(tz) => tz.clone(),
561 }),
562 max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt {
563 parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize,
564 }),
565 content_defined_chunking: proto.content_defined_chunking.map(|cdc| ParquetCdcOptions {
566 enabled: cdc.enabled,
567 min_chunk_size: cdc.min_chunk_size as usize,
568 max_chunk_size: cdc.max_chunk_size as usize,
569 norm_level: cdc.norm_level,
570 }).unwrap_or_default(),
571 }
572 }
573 }
574
575 impl From<ParquetColumnOptionsProto> for ParquetColumnOptions {
576 fn from(proto: ParquetColumnOptionsProto) -> Self {
577 ParquetColumnOptions {
578 bloom_filter_enabled: proto.bloom_filter_enabled_opt.map(
579 |parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v)| v,
580 ),
581 encoding: proto
582 .encoding_opt
583 .map(|parquet_column_options::EncodingOpt::Encoding(v)| v),
584 dictionary_enabled: proto.dictionary_enabled_opt.map(
585 |parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(v)| v,
586 ),
587 compression: proto
588 .compression_opt
589 .map(|parquet_column_options::CompressionOpt::Compression(v)| v),
590 statistics_enabled: proto.statistics_enabled_opt.map(
591 |parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(v)| v,
592 ),
593 bloom_filter_fpp: proto
594 .bloom_filter_fpp_opt
595 .map(|parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v)| v),
596 bloom_filter_ndv: proto
597 .bloom_filter_ndv_opt
598 .map(|parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v)| v),
599 }
600 }
601 }
602
603 impl From<&TableParquetOptionsProto> for TableParquetOptions {
604 fn from(proto: &TableParquetOptionsProto) -> Self {
605 TableParquetOptions {
606 global: proto
607 .global
608 .as_ref()
609 .map(ParquetOptions::from)
610 .unwrap_or_default(),
611 column_specific_options: proto
612 .column_specific_options
613 .iter()
614 .map(|parquet_column_options| {
615 (
616 parquet_column_options.column_name.clone(),
617 ParquetColumnOptions::from(
618 parquet_column_options
619 .options
620 .clone()
621 .unwrap_or_default(),
622 ),
623 )
624 })
625 .collect(),
626 key_value_metadata: proto
627 .key_value_metadata
628 .iter()
629 .map(|(k, v)| (k.clone(), Some(v.clone())))
630 .collect(),
631 ..Default::default()
632 }
633 }
634 }
635
636 #[derive(Debug)]
637 pub struct ParquetLogicalExtensionCodec;
638
639 impl LogicalExtensionCodec for ParquetLogicalExtensionCodec {
641 fn try_decode(
642 &self,
643 _buf: &[u8],
644 _inputs: &[datafusion_expr::LogicalPlan],
645 _ctx: &TaskContext,
646 ) -> datafusion_common::Result<datafusion_expr::Extension> {
647 not_impl_err!("Method not implemented")
648 }
649
650 fn try_encode(
651 &self,
652 _node: &datafusion_expr::Extension,
653 _buf: &mut Vec<u8>,
654 ) -> datafusion_common::Result<()> {
655 not_impl_err!("Method not implemented")
656 }
657
658 fn try_decode_table_provider(
659 &self,
660 _buf: &[u8],
661 _table_ref: &TableReference,
662 _schema: arrow::datatypes::SchemaRef,
663 _ctx: &TaskContext,
664 ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>>
665 {
666 not_impl_err!("Method not implemented")
667 }
668
669 fn try_encode_table_provider(
670 &self,
671 _table_ref: &TableReference,
672 _node: Arc<dyn datafusion_catalog::TableProvider>,
673 _buf: &mut Vec<u8>,
674 ) -> datafusion_common::Result<()> {
675 not_impl_err!("Method not implemented")
676 }
677
678 fn try_decode_file_format(
679 &self,
680 buf: &[u8],
681 _ctx: &TaskContext,
682 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
683 let proto = TableParquetOptionsProto::decode(buf).map_err(|e| {
684 exec_datafusion_err!("Failed to decode TableParquetOptionsProto: {e:?}")
685 })?;
686 let options: TableParquetOptions = (&proto).into();
687 Ok(Arc::new(
688 datafusion_datasource_parquet::file_format::ParquetFormatFactory {
689 options: Some(options),
690 },
691 ))
692 }
693
694 fn try_encode_file_format(
695 &self,
696 buf: &mut Vec<u8>,
697 node: Arc<dyn FileFormatFactory>,
698 ) -> datafusion_common::Result<()> {
699 use datafusion_datasource_parquet::file_format::ParquetFormatFactory;
700
701 let options = if let Some(parquet_factory) =
702 node.downcast_ref::<ParquetFormatFactory>()
703 {
704 parquet_factory.options.clone().unwrap_or_default()
705 } else {
706 return exec_err!("Unsupported FileFormatFactory type");
707 };
708
709 let proto = TableParquetOptionsProto::from_factory(&ParquetFormatFactory {
710 options: Some(options),
711 });
712
713 proto.encode(buf).map_err(|e| {
714 exec_datafusion_err!("Failed to encode TableParquetOptionsProto: {e:?}")
715 })?;
716
717 Ok(())
718 }
719 }
720}
721#[cfg(feature = "parquet")]
722pub use parquet::ParquetLogicalExtensionCodec;
723
724#[derive(Debug)]
725pub struct ArrowLogicalExtensionCodec;
726
727impl LogicalExtensionCodec for ArrowLogicalExtensionCodec {
729 fn try_decode(
730 &self,
731 _buf: &[u8],
732 _inputs: &[datafusion_expr::LogicalPlan],
733 _ctx: &TaskContext,
734 ) -> datafusion_common::Result<datafusion_expr::Extension> {
735 not_impl_err!("Method not implemented")
736 }
737
738 fn try_encode(
739 &self,
740 _node: &datafusion_expr::Extension,
741 _buf: &mut Vec<u8>,
742 ) -> datafusion_common::Result<()> {
743 not_impl_err!("Method not implemented")
744 }
745
746 fn try_decode_table_provider(
747 &self,
748 _buf: &[u8],
749 _table_ref: &TableReference,
750 _schema: arrow::datatypes::SchemaRef,
751 _ctx: &TaskContext,
752 ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>> {
753 not_impl_err!("Method not implemented")
754 }
755
756 fn try_encode_table_provider(
757 &self,
758 _table_ref: &TableReference,
759 _node: Arc<dyn datafusion_catalog::TableProvider>,
760 _buf: &mut Vec<u8>,
761 ) -> datafusion_common::Result<()> {
762 not_impl_err!("Method not implemented")
763 }
764
765 fn try_decode_file_format(
766 &self,
767 __buf: &[u8],
768 __ctx: &TaskContext,
769 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
770 Ok(Arc::new(ArrowFormatFactory::new()))
771 }
772
773 fn try_encode_file_format(
774 &self,
775 __buf: &mut Vec<u8>,
776 __node: Arc<dyn FileFormatFactory>,
777 ) -> datafusion_common::Result<()> {
778 Ok(())
779 }
780}
781
782#[derive(Debug)]
783pub struct AvroLogicalExtensionCodec;
784
785impl LogicalExtensionCodec for AvroLogicalExtensionCodec {
787 fn try_decode(
788 &self,
789 _buf: &[u8],
790 _inputs: &[datafusion_expr::LogicalPlan],
791 _ctx: &TaskContext,
792 ) -> datafusion_common::Result<datafusion_expr::Extension> {
793 not_impl_err!("Method not implemented")
794 }
795
796 fn try_encode(
797 &self,
798 _node: &datafusion_expr::Extension,
799 _buf: &mut Vec<u8>,
800 ) -> datafusion_common::Result<()> {
801 not_impl_err!("Method not implemented")
802 }
803
804 fn try_decode_table_provider(
805 &self,
806 _buf: &[u8],
807 _table_ref: &TableReference,
808 _schema: arrow::datatypes::SchemaRef,
809 _cts: &TaskContext,
810 ) -> datafusion_common::Result<Arc<dyn datafusion_catalog::TableProvider>> {
811 not_impl_err!("Method not implemented")
812 }
813
814 fn try_encode_table_provider(
815 &self,
816 _table_ref: &TableReference,
817 _node: Arc<dyn datafusion_catalog::TableProvider>,
818 _buf: &mut Vec<u8>,
819 ) -> datafusion_common::Result<()> {
820 not_impl_err!("Method not implemented")
821 }
822
823 fn try_decode_file_format(
824 &self,
825 __buf: &[u8],
826 __ctx: &TaskContext,
827 ) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
828 Ok(Arc::new(ArrowFormatFactory::new()))
829 }
830
831 fn try_encode_file_format(
832 &self,
833 __buf: &mut Vec<u8>,
834 __node: Arc<dyn FileFormatFactory>,
835 ) -> datafusion_common::Result<()> {
836 Ok(())
837 }
838}