1use std::sync::Arc;
21
22use arrow::array::RecordBatch;
23use arrow::compute::SortOptions;
24use arrow::datatypes::{Field, Schema};
25use arrow::ipc::reader::StreamReader;
26use chrono::{TimeZone, Utc};
27use datafusion_common::{DataFusionError, Result, internal_datafusion_err, not_impl_err};
28use datafusion_datasource::file::FileSource;
29use datafusion_datasource::file_groups::FileGroup;
30use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
31use datafusion_datasource::file_sink_config::FileSinkConfig;
32use datafusion_datasource::{FileRange, ListingTableUrl, PartitionedFile, TableSchema};
33use datafusion_datasource_csv::file_format::CsvSink;
34use datafusion_datasource_json::file_format::JsonSink;
35#[cfg(feature = "parquet")]
36use datafusion_datasource_parquet::file_format::ParquetSink;
37use datafusion_execution::object_store::ObjectStoreUrl;
38use datafusion_execution::{FunctionRegistry, TaskContext};
39use datafusion_expr::WindowFunctionDefinition;
40use datafusion_expr::dml::InsertOp;
41use datafusion_expr::execution_props::SubqueryIndex;
42use datafusion_physical_expr::projection::{ProjectionExpr, ProjectionExprs};
43use datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr;
44use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr};
45use datafusion_physical_plan::expressions::{
46 BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr, LikeExpr, Literal,
47 NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, in_list,
48};
49use datafusion_physical_plan::joins::{HashExpr, SeededRandomState};
50use datafusion_physical_plan::windows::{create_window_expr, schema_add_window_field};
51use datafusion_physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
52use datafusion_proto_common::common::proto_error;
53use object_store::ObjectMeta;
54use object_store::path::Path;
55
56use super::{
57 DefaultPhysicalProtoConverter, PhysicalExtensionCodec, PhysicalPlanDecodeContext,
58 PhysicalProtoConverterExtension,
59};
60use crate::logical_plan::{self};
61use crate::protobuf::physical_expr_node::ExprType;
62use crate::{convert_required, protobuf};
63use datafusion_physical_expr::expressions::{
64 DynamicFilterInner, DynamicFilterPhysicalExpr,
65};
66
67impl From<&protobuf::PhysicalColumn> for Column {
68 fn from(c: &protobuf::PhysicalColumn) -> Column {
69 Column::new(&c.name, c.index as usize)
70 }
71}
72
73pub fn parse_physical_sort_expr(
85 proto: &protobuf::PhysicalSortExprNode,
86 ctx: &PhysicalPlanDecodeContext<'_>,
87 input_schema: &Schema,
88 proto_converter: &dyn PhysicalProtoConverterExtension,
89) -> Result<PhysicalSortExpr> {
90 if let Some(expr) = &proto.expr {
91 let expr =
92 proto_converter.proto_to_physical_expr(expr.as_ref(), input_schema, ctx)?;
93 let options = SortOptions {
94 descending: !proto.asc,
95 nulls_first: proto.nulls_first,
96 };
97 Ok(PhysicalSortExpr { expr, options })
98 } else {
99 Err(proto_error("Unexpected empty physical expression"))
100 }
101}
102
103pub fn parse_physical_sort_exprs(
115 proto: &[protobuf::PhysicalSortExprNode],
116 ctx: &PhysicalPlanDecodeContext<'_>,
117 input_schema: &Schema,
118 proto_converter: &dyn PhysicalProtoConverterExtension,
119) -> Result<Vec<PhysicalSortExpr>> {
120 proto
121 .iter()
122 .map(|sort_expr| {
123 parse_physical_sort_expr(sort_expr, ctx, input_schema, proto_converter)
124 })
125 .collect()
126}
127
128pub fn parse_physical_window_expr(
141 proto: &protobuf::PhysicalWindowExprNode,
142 ctx: &PhysicalPlanDecodeContext<'_>,
143 input_schema: &Schema,
144 proto_converter: &dyn PhysicalProtoConverterExtension,
145) -> Result<Arc<dyn WindowExpr>> {
146 let window_node_expr =
147 parse_physical_exprs(&proto.args, ctx, input_schema, proto_converter)?;
148 let partition_by =
149 parse_physical_exprs(&proto.partition_by, ctx, input_schema, proto_converter)?;
150
151 let order_by =
152 parse_physical_sort_exprs(&proto.order_by, ctx, input_schema, proto_converter)?;
153
154 let window_frame = proto
155 .window_frame
156 .as_ref()
157 .map(|wf| wf.clone().try_into())
158 .transpose()
159 .map_err(|e| internal_datafusion_err!("{e}"))?
160 .ok_or_else(|| {
161 internal_datafusion_err!("Missing required field 'window_frame' in protobuf")
162 })?;
163
164 let fun = if let Some(window_func) = proto.window_function.as_ref() {
165 match window_func {
166 protobuf::physical_window_expr_node::WindowFunction::UserDefinedAggrFunction(udaf_name) => {
167 WindowFunctionDefinition::AggregateUDF(match &proto.fun_definition {
168 Some(buf) => ctx.codec().try_decode_udaf(udaf_name, buf)?,
169 None => ctx
170 .task_ctx()
171 .udaf(udaf_name)
172 .or_else(|_| ctx.codec().try_decode_udaf(udaf_name, &[]))?,
173 })
174 }
175 protobuf::physical_window_expr_node::WindowFunction::UserDefinedWindowFunction(udwf_name) => {
176 WindowFunctionDefinition::WindowUDF(match &proto.fun_definition {
177 Some(buf) => ctx.codec().try_decode_udwf(udwf_name, buf)?,
178 None => ctx
179 .task_ctx()
180 .udwf(udwf_name)
181 .or_else(|_| ctx.codec().try_decode_udwf(udwf_name, &[]))?
182 })
183 }
184 }
185 } else {
186 return Err(proto_error("Missing required field in protobuf"));
187 };
188
189 let name = proto.name.clone();
190 let extended_schema =
192 schema_add_window_field(&window_node_expr, input_schema, &fun, &name)?;
193 create_window_expr(
194 &fun,
195 name,
196 &window_node_expr,
197 &partition_by,
198 &order_by,
199 Arc::new(window_frame),
200 extended_schema,
201 proto.ignore_nulls,
202 proto.distinct,
203 None,
204 )
205}
206
207pub fn parse_physical_exprs<'a, I>(
208 protos: I,
209 ctx: &PhysicalPlanDecodeContext<'_>,
210 input_schema: &Schema,
211 proto_converter: &dyn PhysicalProtoConverterExtension,
212) -> Result<Vec<Arc<dyn PhysicalExpr>>>
213where
214 I: IntoIterator<Item = &'a protobuf::PhysicalExprNode>,
215{
216 protos
217 .into_iter()
218 .map(|p| proto_converter.proto_to_physical_expr(p, input_schema, ctx))
219 .collect::<Result<Vec<_>>>()
220}
221
222pub fn parse_physical_expr(
233 proto: &protobuf::PhysicalExprNode,
234 ctx: &TaskContext,
235 input_schema: &Schema,
236 codec: &dyn PhysicalExtensionCodec,
237) -> Result<Arc<dyn PhysicalExpr>> {
238 let decode_ctx = PhysicalPlanDecodeContext::new(ctx, codec);
239 parse_physical_expr_with_converter(
240 proto,
241 input_schema,
242 &decode_ctx,
243 &DefaultPhysicalProtoConverter {},
244 )
245}
246
247pub fn parse_physical_expr_with_converter(
259 proto: &protobuf::PhysicalExprNode,
260 input_schema: &Schema,
261 ctx: &PhysicalPlanDecodeContext<'_>,
262 proto_converter: &dyn PhysicalProtoConverterExtension,
263) -> Result<Arc<dyn PhysicalExpr>> {
264 let expr_type = proto
265 .expr_type
266 .as_ref()
267 .ok_or_else(|| proto_error("Unexpected empty physical expression"))?;
268
269 let pexpr: Arc<dyn PhysicalExpr> = match expr_type {
270 ExprType::Column(c) => {
271 let pcol: Column = c.into();
272 Arc::new(pcol)
273 }
274 ExprType::UnknownColumn(c) => Arc::new(UnKnownColumn::new(&c.name)),
275 ExprType::Literal(scalar) => Arc::new(Literal::new(scalar.try_into()?)),
276 ExprType::BinaryExpr(binary_expr) => {
277 let op = logical_plan::from_proto::from_proto_binary_op(&binary_expr.op)?;
278 if !binary_expr.operands.is_empty() {
279 let operands: Vec<Arc<dyn PhysicalExpr>> = binary_expr
282 .operands
283 .iter()
284 .map(|e| proto_converter.proto_to_physical_expr(e, input_schema, ctx))
285 .collect::<Result<Vec<_>>>()?;
286
287 if operands.len() < 2 {
288 return Err(proto_error(
289 "A binary expression must always have at least 2 operands",
290 ));
291 }
292
293 operands
294 .into_iter()
295 .reduce(|left, right| Arc::new(BinaryExpr::new(left, op, right)))
296 .expect(
297 "Binary expression could not be reduced to a single expression.",
298 )
299 } else {
300 Arc::new(BinaryExpr::new(
302 parse_required_physical_expr(
303 binary_expr.l.as_deref(),
304 ctx,
305 "left",
306 input_schema,
307 proto_converter,
308 )?,
309 op,
310 parse_required_physical_expr(
311 binary_expr.r.as_deref(),
312 ctx,
313 "right",
314 input_schema,
315 proto_converter,
316 )?,
317 ))
318 }
319 }
320 ExprType::AggregateExpr(_) => {
321 return not_impl_err!(
322 "Cannot convert aggregate expr node to physical expression"
323 );
324 }
325 ExprType::WindowExpr(_) => {
326 return not_impl_err!(
327 "Cannot convert window expr node to physical expression"
328 );
329 }
330 ExprType::Sort(_) => {
331 return not_impl_err!("Cannot convert sort expr node to physical expression");
332 }
333 ExprType::IsNullExpr(e) => {
334 Arc::new(IsNullExpr::new(parse_required_physical_expr(
335 e.expr.as_deref(),
336 ctx,
337 "expr",
338 input_schema,
339 proto_converter,
340 )?))
341 }
342 ExprType::IsNotNullExpr(e) => {
343 Arc::new(IsNotNullExpr::new(parse_required_physical_expr(
344 e.expr.as_deref(),
345 ctx,
346 "expr",
347 input_schema,
348 proto_converter,
349 )?))
350 }
351 ExprType::NotExpr(e) => Arc::new(NotExpr::new(parse_required_physical_expr(
352 e.expr.as_deref(),
353 ctx,
354 "expr",
355 input_schema,
356 proto_converter,
357 )?)),
358 ExprType::Negative(e) => {
359 Arc::new(NegativeExpr::new(parse_required_physical_expr(
360 e.expr.as_deref(),
361 ctx,
362 "expr",
363 input_schema,
364 proto_converter,
365 )?))
366 }
367 ExprType::InList(e) => in_list(
368 parse_required_physical_expr(
369 e.expr.as_deref(),
370 ctx,
371 "expr",
372 input_schema,
373 proto_converter,
374 )?,
375 parse_physical_exprs(&e.list, ctx, input_schema, proto_converter)?,
376 &e.negated,
377 input_schema,
378 )?,
379 ExprType::Case(e) => Arc::new(CaseExpr::try_new(
380 e.expr
381 .as_ref()
382 .map(|e| {
383 proto_converter.proto_to_physical_expr(e.as_ref(), input_schema, ctx)
384 })
385 .transpose()?,
386 e.when_then_expr
387 .iter()
388 .map(|e| {
389 Ok((
390 parse_required_physical_expr(
391 e.when_expr.as_ref(),
392 ctx,
393 "when_expr",
394 input_schema,
395 proto_converter,
396 )?,
397 parse_required_physical_expr(
398 e.then_expr.as_ref(),
399 ctx,
400 "then_expr",
401 input_schema,
402 proto_converter,
403 )?,
404 ))
405 })
406 .collect::<Result<Vec<_>>>()?,
407 e.else_expr
408 .as_ref()
409 .map(|e| {
410 proto_converter.proto_to_physical_expr(e.as_ref(), input_schema, ctx)
411 })
412 .transpose()?,
413 )?),
414 ExprType::Cast(e) => Arc::new(CastExpr::new(
415 parse_required_physical_expr(
416 e.expr.as_deref(),
417 ctx,
418 "expr",
419 input_schema,
420 proto_converter,
421 )?,
422 convert_required!(e.arrow_type)?,
423 None,
424 )),
425 ExprType::TryCast(e) => Arc::new(TryCastExpr::new(
426 parse_required_physical_expr(
427 e.expr.as_deref(),
428 ctx,
429 "expr",
430 input_schema,
431 proto_converter,
432 )?,
433 convert_required!(e.arrow_type)?,
434 )),
435 ExprType::ScalarUdf(e) => {
436 let udf = match &e.fun_definition {
437 Some(buf) => ctx.codec().try_decode_udf(&e.name, buf)?,
438 None => ctx
439 .task_ctx()
440 .udf(e.name.as_str())
441 .or_else(|_| ctx.codec().try_decode_udf(&e.name, &[]))?,
442 };
443 let scalar_fun_def = Arc::clone(&udf);
444
445 let args = parse_physical_exprs(&e.args, ctx, input_schema, proto_converter)?;
446
447 let config_options = Arc::clone(ctx.task_ctx().session_config().options());
448
449 Arc::new(
450 ScalarFunctionExpr::new(
451 e.name.as_str(),
452 scalar_fun_def,
453 args,
454 Field::new(
455 &e.return_field_name,
456 convert_required!(e.return_type)?,
457 true,
458 )
459 .into(),
460 config_options,
461 )
462 .with_nullable(e.nullable),
463 )
464 }
465 ExprType::LikeExpr(like_expr) => Arc::new(LikeExpr::new(
466 like_expr.negated,
467 like_expr.case_insensitive,
468 parse_required_physical_expr(
469 like_expr.expr.as_deref(),
470 ctx,
471 "expr",
472 input_schema,
473 proto_converter,
474 )?,
475 parse_required_physical_expr(
476 like_expr.pattern.as_deref(),
477 ctx,
478 "pattern",
479 input_schema,
480 proto_converter,
481 )?,
482 )),
483 ExprType::HashExpr(hash_expr) => {
484 let on_columns = parse_physical_exprs(
485 &hash_expr.on_columns,
486 ctx,
487 input_schema,
488 proto_converter,
489 )?;
490 Arc::new(HashExpr::new(
491 on_columns,
492 SeededRandomState::with_seed(hash_expr.seed0),
493 hash_expr.description.clone(),
494 ))
495 }
496 ExprType::ScalarSubquery(sq) => {
497 let data_type: arrow::datatypes::DataType = sq
498 .data_type
499 .as_ref()
500 .ok_or_else(|| {
501 proto_error("Missing data_type in PhysicalScalarSubqueryExprNode")
502 })?
503 .try_into()?;
504 let results = ctx.scalar_subquery_results().ok_or_else(|| {
505 proto_error(
506 "ScalarSubqueryExpr can only be deserialized as part \
507 of a surrounding ScalarSubqueryExec",
508 )
509 })?;
510 Arc::new(ScalarSubqueryExpr::new(
511 data_type,
512 sq.nullable,
513 SubqueryIndex::new(sq.index as usize),
514 results.clone(),
515 ))
516 }
517 ExprType::DynamicFilter(dynamic_filter) => {
518 let children = parse_physical_exprs(
519 &dynamic_filter.children,
520 ctx,
521 input_schema,
522 proto_converter,
523 )?;
524
525 let remapped_children = if !dynamic_filter.remapped_children.is_empty() {
526 Some(parse_physical_exprs(
527 &dynamic_filter.remapped_children,
528 ctx,
529 input_schema,
530 proto_converter,
531 )?)
532 } else {
533 None
534 };
535
536 let inner_expr = parse_required_physical_expr(
537 dynamic_filter.inner_expr.as_deref(),
538 ctx,
539 "inner_expr",
540 input_schema,
541 proto_converter,
542 )?;
543
544 let expression_id = proto.expr_id.ok_or_else(|| {
545 proto_error(
546 "DynamicFilterPhysicalExpr requires PhysicalExprNode.expr_id \
547 to be set by the serializer",
548 )
549 })?;
550
551 let base_filter: Arc<dyn PhysicalExpr> =
552 Arc::new(DynamicFilterPhysicalExpr::from_parts(
553 children,
554 remapped_children,
555 DynamicFilterInner {
556 expression_id,
557 generation: dynamic_filter.generation,
558 expr: inner_expr,
559 is_complete: dynamic_filter.is_complete,
560 },
561 ));
562 base_filter
563 }
564 ExprType::Extension(extension) => {
565 let inputs: Vec<Arc<dyn PhysicalExpr>> = extension
566 .inputs
567 .iter()
568 .map(|e| proto_converter.proto_to_physical_expr(e, input_schema, ctx))
569 .collect::<Result<_>>()?;
570 ctx.codec()
571 .try_decode_expr(extension.expr.as_slice(), &inputs)? as _
572 }
573 };
574
575 Ok(pexpr)
576}
577
578fn parse_required_physical_expr(
579 expr: Option<&protobuf::PhysicalExprNode>,
580 ctx: &PhysicalPlanDecodeContext<'_>,
581 field: &str,
582 input_schema: &Schema,
583 proto_converter: &dyn PhysicalProtoConverterExtension,
584) -> Result<Arc<dyn PhysicalExpr>> {
585 expr.map(|e| proto_converter.proto_to_physical_expr(e, input_schema, ctx))
586 .transpose()?
587 .ok_or_else(|| internal_datafusion_err!("Missing required field {field:?}"))
588}
589
590pub fn parse_protobuf_hash_partitioning(
591 partitioning: Option<&protobuf::PhysicalHashRepartition>,
592 ctx: &PhysicalPlanDecodeContext<'_>,
593 input_schema: &Schema,
594 proto_converter: &dyn PhysicalProtoConverterExtension,
595) -> Result<Option<Partitioning>> {
596 match partitioning {
597 Some(hash_part) => {
598 let expr = parse_physical_exprs(
599 &hash_part.hash_expr,
600 ctx,
601 input_schema,
602 proto_converter,
603 )?;
604
605 Ok(Some(Partitioning::Hash(
606 expr,
607 hash_part.partition_count.try_into().unwrap(),
608 )))
609 }
610 None => Ok(None),
611 }
612}
613
614pub fn parse_protobuf_partitioning(
615 partitioning: Option<&protobuf::Partitioning>,
616 ctx: &PhysicalPlanDecodeContext<'_>,
617 input_schema: &Schema,
618 proto_converter: &dyn PhysicalProtoConverterExtension,
619) -> Result<Option<Partitioning>> {
620 match partitioning {
621 Some(protobuf::Partitioning { partition_method }) => match partition_method {
622 Some(protobuf::partitioning::PartitionMethod::RoundRobin(
623 partition_count,
624 )) => Ok(Some(Partitioning::RoundRobinBatch(
625 *partition_count as usize,
626 ))),
627 Some(protobuf::partitioning::PartitionMethod::Hash(hash_repartition)) => {
628 parse_protobuf_hash_partitioning(
629 Some(hash_repartition),
630 ctx,
631 input_schema,
632 proto_converter,
633 )
634 }
635 Some(protobuf::partitioning::PartitionMethod::Unknown(partition_count)) => {
636 Ok(Some(Partitioning::UnknownPartitioning(
637 *partition_count as usize,
638 )))
639 }
640 None => Ok(None),
641 },
642 None => Ok(None),
643 }
644}
645
646pub fn parse_protobuf_file_scan_schema(
647 proto: &protobuf::FileScanExecConf,
648) -> Result<Arc<Schema>> {
649 Ok(Arc::new(convert_required!(proto.schema)?))
650}
651
652pub fn parse_table_schema_from_proto(
654 proto: &protobuf::FileScanExecConf,
655) -> Result<TableSchema> {
656 let schema: Arc<Schema> = parse_protobuf_file_scan_schema(proto)?;
657
658 let table_partition_cols = proto
660 .table_partition_cols
661 .iter()
662 .map(|col| Ok(Arc::new(schema.field_with_name(col)?.clone())))
663 .collect::<Result<Vec<_>>>()?;
664
665 let file_schema = Arc::new(
669 Schema::new(
670 schema
671 .fields()
672 .iter()
673 .filter(|field| !table_partition_cols.contains(field))
674 .cloned()
675 .collect::<Vec<_>>(),
676 )
677 .with_metadata(schema.metadata.clone()),
678 );
679
680 Ok(TableSchema::new(file_schema, table_partition_cols))
681}
682
683pub fn parse_protobuf_file_scan_config(
684 proto: &protobuf::FileScanExecConf,
685 ctx: &PhysicalPlanDecodeContext<'_>,
686 proto_converter: &dyn PhysicalProtoConverterExtension,
687 file_source: Arc<dyn FileSource>,
688) -> Result<FileScanConfig> {
689 let schema: Arc<Schema> = parse_protobuf_file_scan_schema(proto)?;
690
691 let constraints = convert_required!(proto.constraints)?;
692 let statistics = convert_required!(proto.statistics)?;
693
694 let file_groups = proto
695 .file_groups
696 .iter()
697 .map(|f| f.try_into())
698 .collect::<Result<Vec<_>, _>>()?;
699
700 let object_store_url = match proto.object_store_url.is_empty() {
701 false => ObjectStoreUrl::parse(&proto.object_store_url)?,
702 true => ObjectStoreUrl::local_filesystem(),
703 };
704
705 let mut output_ordering = vec![];
706 for node_collection in &proto.output_ordering {
707 let sort_exprs = parse_physical_sort_exprs(
708 &node_collection.physical_sort_expr_nodes,
709 ctx,
710 &schema,
711 proto_converter,
712 )?;
713 output_ordering.extend(LexOrdering::new(sort_exprs));
714 }
715
716 let file_source = if let Some(proto_projection_exprs) = &proto.projection_exprs {
718 let projection_exprs: Vec<ProjectionExpr> = proto_projection_exprs
719 .projections
720 .iter()
721 .map(|proto_expr| {
722 let expr = proto_converter.proto_to_physical_expr(
723 proto_expr.expr.as_ref().ok_or_else(|| {
724 internal_datafusion_err!("ProjectionExpr missing expr field")
725 })?,
726 &schema,
727 ctx,
728 )?;
729 Ok(ProjectionExpr::new(expr, proto_expr.alias.clone()))
730 })
731 .collect::<Result<Vec<_>>>()?;
732
733 let projection_exprs = ProjectionExprs::new(projection_exprs);
734
735 file_source
737 .try_pushdown_projection(&projection_exprs)?
738 .unwrap_or(file_source)
739 } else {
740 file_source
741 };
742
743 let config = FileScanConfigBuilder::new(object_store_url, file_source)
744 .with_file_groups(file_groups)
745 .with_constraints(constraints)
746 .with_statistics(statistics)
747 .with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize))
748 .with_output_ordering(output_ordering)
749 .with_batch_size(proto.batch_size.map(|s| s as usize))
750 .build();
751 Ok(config)
752}
753
754pub fn parse_record_batches(buf: &[u8]) -> Result<Vec<RecordBatch>> {
755 if buf.is_empty() {
756 return Ok(vec![]);
757 }
758 let reader = StreamReader::try_new(buf, None)?;
759 let mut batches = Vec::new();
760 for batch in reader {
761 batches.push(batch?);
762 }
763 Ok(batches)
764}
765
766impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile {
767 type Error = DataFusionError;
768
769 fn try_from(val: &protobuf::PartitionedFile) -> Result<Self, Self::Error> {
770 let mut pf = PartitionedFile::new_from_meta(ObjectMeta {
771 location: Path::parse(val.path.as_str())
772 .map_err(|e| proto_error(format!("Invalid object_store path: {e}")))?,
773 last_modified: Utc.timestamp_nanos(val.last_modified_ns as i64),
774 size: val.size,
775 e_tag: None,
776 version: None,
777 })
778 .with_partition_values(
779 val.partition_values
780 .iter()
781 .map(|v| v.try_into())
782 .collect::<Result<Vec<_>, _>>()?,
783 );
784 if let Some(range) = val.range.as_ref() {
785 let file_range: FileRange = range.try_into()?;
786 pf = pf.with_range(file_range.start, file_range.end);
787 }
788 if let Some(proto_stats) = val.statistics.as_ref() {
789 pf = pf.with_statistics(Arc::new(proto_stats.try_into()?));
790 }
791 Ok(pf)
792 }
793}
794
795impl TryFrom<&protobuf::FileRange> for FileRange {
796 type Error = DataFusionError;
797
798 fn try_from(value: &protobuf::FileRange) -> Result<Self, Self::Error> {
799 Ok(FileRange {
800 start: value.start,
801 end: value.end,
802 })
803 }
804}
805
806impl TryFrom<&protobuf::FileGroup> for FileGroup {
807 type Error = DataFusionError;
808
809 fn try_from(val: &protobuf::FileGroup) -> Result<Self, Self::Error> {
810 let files = val
811 .files
812 .iter()
813 .map(|f| f.try_into())
814 .collect::<Result<Vec<_>, _>>()?;
815 Ok(FileGroup::new(files))
816 }
817}
818
819impl TryFrom<&protobuf::JsonSink> for JsonSink {
820 type Error = DataFusionError;
821
822 fn try_from(value: &protobuf::JsonSink) -> Result<Self, Self::Error> {
823 Ok(Self::new(
824 convert_required!(value.config)?,
825 convert_required!(value.writer_options)?,
826 ))
827 }
828}
829
830#[cfg(feature = "parquet")]
831impl TryFrom<&protobuf::ParquetSink> for ParquetSink {
832 type Error = DataFusionError;
833
834 fn try_from(value: &protobuf::ParquetSink) -> Result<Self, Self::Error> {
835 Ok(Self::new(
836 convert_required!(value.config)?,
837 convert_required!(value.parquet_options)?,
838 ))
839 }
840}
841
842impl TryFrom<&protobuf::CsvSink> for CsvSink {
843 type Error = DataFusionError;
844
845 fn try_from(value: &protobuf::CsvSink) -> Result<Self, Self::Error> {
846 Ok(Self::new(
847 convert_required!(value.config)?,
848 convert_required!(value.writer_options)?,
849 ))
850 }
851}
852
853impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig {
854 type Error = DataFusionError;
855
856 fn try_from(conf: &protobuf::FileSinkConfig) -> Result<Self, Self::Error> {
857 let file_group = FileGroup::new(
858 conf.file_groups
859 .iter()
860 .map(|f| f.try_into())
861 .collect::<Result<Vec<_>>>()?,
862 );
863 let table_paths = conf
864 .table_paths
865 .iter()
866 .map(ListingTableUrl::parse)
867 .collect::<Result<Vec<_>>>()?;
868 let table_partition_cols = conf
869 .table_partition_cols
870 .iter()
871 .map(|protobuf::PartitionColumn { name, arrow_type }| {
872 let data_type = convert_required!(arrow_type)?;
873 Ok((name.clone(), data_type))
874 })
875 .collect::<Result<Vec<_>>>()?;
876 let insert_op = match conf.insert_op() {
877 protobuf::InsertOp::Append => InsertOp::Append,
878 protobuf::InsertOp::Overwrite => InsertOp::Overwrite,
879 protobuf::InsertOp::Replace => InsertOp::Replace,
880 };
881 let file_output_mode = match conf.file_output_mode() {
882 protobuf::FileOutputMode::Automatic => {
883 datafusion_datasource::file_sink_config::FileOutputMode::Automatic
884 }
885 protobuf::FileOutputMode::SingleFile => {
886 datafusion_datasource::file_sink_config::FileOutputMode::SingleFile
887 }
888 protobuf::FileOutputMode::Directory => {
889 datafusion_datasource::file_sink_config::FileOutputMode::Directory
890 }
891 };
892 Ok(Self {
893 original_url: String::default(),
894 object_store_url: ObjectStoreUrl::parse(&conf.object_store_url)?,
895 file_group,
896 table_paths,
897 output_schema: Arc::new(convert_required!(conf.output_schema)?),
898 table_partition_cols,
899 insert_op,
900 keep_partition_by_columns: conf.keep_partition_by_columns,
901 file_extension: conf.file_extension.clone(),
902 file_output_mode,
903 })
904 }
905}
906
907#[cfg(test)]
908mod tests {
909
910 use super::*;
911
912 #[test]
913 fn partitioned_file_path_roundtrip_percent_encoded() {
914 let path_str = "foo/foo%2Fbar/baz%252Fqux";
915 let pf = PartitionedFile::new_from_meta(ObjectMeta {
916 location: Path::parse(path_str).unwrap(),
917 last_modified: Utc.timestamp_nanos(1_000),
918 size: 42,
919 e_tag: None,
920 version: None,
921 });
922
923 let proto = protobuf::PartitionedFile::try_from(&pf).unwrap();
924 assert_eq!(proto.path, path_str);
925
926 let pf2 = PartitionedFile::try_from(&proto).unwrap();
927 assert_eq!(pf2.object_meta.location.as_ref(), path_str);
928 assert_eq!(pf2.object_meta.location, pf.object_meta.location);
929 assert_eq!(pf2.object_meta.size, pf.object_meta.size);
930 assert_eq!(pf2.object_meta.last_modified, pf.object_meta.last_modified);
931 }
932
933 #[test]
934 fn partitioned_file_from_proto_invalid_path() {
935 let proto = protobuf::PartitionedFile {
936 path: "foo//bar".to_string(),
937 size: 1,
938 last_modified_ns: 0,
939 partition_values: vec![],
940 range: None,
941 statistics: None,
942 };
943
944 let err = PartitionedFile::try_from(&proto).unwrap_err();
945 assert!(err.to_string().contains("Invalid object_store path"));
946 }
947}