1use std::sync::Arc;
21
22use arrow::array::RecordBatch;
23use arrow::compute::SortOptions;
24use arrow::datatypes::{Field, Schema};
25use arrow::ipc::reader::StreamReader;
26use chrono::{TimeZone, Utc};
27use datafusion_common::{DataFusionError, Result, internal_datafusion_err, not_impl_err};
28use datafusion_datasource::file::FileSource;
29use datafusion_datasource::file_groups::FileGroup;
30use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
31use datafusion_datasource::file_sink_config::FileSinkConfig;
32use datafusion_datasource::{FileRange, ListingTableUrl, PartitionedFile, TableSchema};
33use datafusion_datasource_csv::file_format::CsvSink;
34use datafusion_datasource_json::file_format::JsonSink;
35#[cfg(feature = "parquet")]
36use datafusion_datasource_parquet::file_format::ParquetSink;
37use datafusion_execution::object_store::ObjectStoreUrl;
38use datafusion_execution::{FunctionRegistry, TaskContext};
39use datafusion_expr::WindowFunctionDefinition;
40use datafusion_expr::dml::InsertOp;
41use datafusion_physical_expr::projection::{ProjectionExpr, ProjectionExprs};
42use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr};
43use datafusion_physical_plan::expressions::{
44 BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr, LikeExpr, Literal,
45 NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, in_list,
46};
47use datafusion_physical_plan::joins::{HashExpr, SeededRandomState};
48use datafusion_physical_plan::windows::{create_window_expr, schema_add_window_field};
49use datafusion_physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
50use datafusion_proto_common::common::proto_error;
51use object_store::ObjectMeta;
52use object_store::path::Path;
53
54use super::{
55 DefaultPhysicalProtoConverter, PhysicalExtensionCodec,
56 PhysicalProtoConverterExtension,
57};
58use crate::logical_plan::{self};
59use crate::protobuf::physical_expr_node::ExprType;
60use crate::{convert_required, protobuf};
61
62impl From<&protobuf::PhysicalColumn> for Column {
63 fn from(c: &protobuf::PhysicalColumn) -> Column {
64 Column::new(&c.name, c.index as usize)
65 }
66}
67
68pub fn parse_physical_sort_expr(
78 proto: &protobuf::PhysicalSortExprNode,
79 ctx: &TaskContext,
80 input_schema: &Schema,
81 codec: &dyn PhysicalExtensionCodec,
82 proto_converter: &dyn PhysicalProtoConverterExtension,
83) -> Result<PhysicalSortExpr> {
84 if let Some(expr) = &proto.expr {
85 let expr = proto_converter.proto_to_physical_expr(
86 expr.as_ref(),
87 ctx,
88 input_schema,
89 codec,
90 )?;
91 let options = SortOptions {
92 descending: !proto.asc,
93 nulls_first: proto.nulls_first,
94 };
95 Ok(PhysicalSortExpr { expr, options })
96 } else {
97 Err(proto_error("Unexpected empty physical expression"))
98 }
99}
100
101pub fn parse_physical_sort_exprs(
111 proto: &[protobuf::PhysicalSortExprNode],
112 ctx: &TaskContext,
113 input_schema: &Schema,
114 codec: &dyn PhysicalExtensionCodec,
115 proto_converter: &dyn PhysicalProtoConverterExtension,
116) -> Result<Vec<PhysicalSortExpr>> {
117 proto
118 .iter()
119 .map(|sort_expr| {
120 parse_physical_sort_expr(sort_expr, ctx, input_schema, codec, proto_converter)
121 })
122 .collect()
123}
124
125pub fn parse_physical_window_expr(
136 proto: &protobuf::PhysicalWindowExprNode,
137 ctx: &TaskContext,
138 input_schema: &Schema,
139 codec: &dyn PhysicalExtensionCodec,
140 proto_converter: &dyn PhysicalProtoConverterExtension,
141) -> Result<Arc<dyn WindowExpr>> {
142 let window_node_expr =
143 parse_physical_exprs(&proto.args, ctx, input_schema, codec, proto_converter)?;
144 let partition_by = parse_physical_exprs(
145 &proto.partition_by,
146 ctx,
147 input_schema,
148 codec,
149 proto_converter,
150 )?;
151
152 let order_by = parse_physical_sort_exprs(
153 &proto.order_by,
154 ctx,
155 input_schema,
156 codec,
157 proto_converter,
158 )?;
159
160 let window_frame = proto
161 .window_frame
162 .as_ref()
163 .map(|wf| wf.clone().try_into())
164 .transpose()
165 .map_err(|e| internal_datafusion_err!("{e}"))?
166 .ok_or_else(|| {
167 internal_datafusion_err!("Missing required field 'window_frame' in protobuf")
168 })?;
169
170 let fun = if let Some(window_func) = proto.window_function.as_ref() {
171 match window_func {
172 protobuf::physical_window_expr_node::WindowFunction::UserDefinedAggrFunction(udaf_name) => {
173 WindowFunctionDefinition::AggregateUDF(match &proto.fun_definition {
174 Some(buf) => codec.try_decode_udaf(udaf_name, buf)?,
175 None => ctx.udaf(udaf_name).or_else(|_| codec.try_decode_udaf(udaf_name, &[]))?,
176 })
177 }
178 protobuf::physical_window_expr_node::WindowFunction::UserDefinedWindowFunction(udwf_name) => {
179 WindowFunctionDefinition::WindowUDF(match &proto.fun_definition {
180 Some(buf) => codec.try_decode_udwf(udwf_name, buf)?,
181 None => ctx.udwf(udwf_name).or_else(|_| codec.try_decode_udwf(udwf_name, &[]))?
182 })
183 }
184 }
185 } else {
186 return Err(proto_error("Missing required field in protobuf"));
187 };
188
189 let name = proto.name.clone();
190 let extended_schema =
192 schema_add_window_field(&window_node_expr, input_schema, &fun, &name)?;
193 create_window_expr(
194 &fun,
195 name,
196 &window_node_expr,
197 &partition_by,
198 &order_by,
199 Arc::new(window_frame),
200 extended_schema,
201 proto.ignore_nulls,
202 proto.distinct,
203 None,
204 )
205}
206
207pub fn parse_physical_exprs<'a, I>(
208 protos: I,
209 ctx: &TaskContext,
210 input_schema: &Schema,
211 codec: &dyn PhysicalExtensionCodec,
212 proto_converter: &dyn PhysicalProtoConverterExtension,
213) -> Result<Vec<Arc<dyn PhysicalExpr>>>
214where
215 I: IntoIterator<Item = &'a protobuf::PhysicalExprNode>,
216{
217 protos
218 .into_iter()
219 .map(|p| proto_converter.proto_to_physical_expr(p, ctx, input_schema, codec))
220 .collect::<Result<Vec<_>>>()
221}
222
223pub fn parse_physical_expr(
233 proto: &protobuf::PhysicalExprNode,
234 ctx: &TaskContext,
235 input_schema: &Schema,
236 codec: &dyn PhysicalExtensionCodec,
237) -> Result<Arc<dyn PhysicalExpr>> {
238 parse_physical_expr_with_converter(
239 proto,
240 ctx,
241 input_schema,
242 codec,
243 &DefaultPhysicalProtoConverter {},
244 )
245}
246
247pub fn parse_physical_expr_with_converter(
258 proto: &protobuf::PhysicalExprNode,
259 ctx: &TaskContext,
260 input_schema: &Schema,
261 codec: &dyn PhysicalExtensionCodec,
262 proto_converter: &dyn PhysicalProtoConverterExtension,
263) -> Result<Arc<dyn PhysicalExpr>> {
264 let expr_type = proto
265 .expr_type
266 .as_ref()
267 .ok_or_else(|| proto_error("Unexpected empty physical expression"))?;
268
269 let pexpr: Arc<dyn PhysicalExpr> = match expr_type {
270 ExprType::Column(c) => {
271 let pcol: Column = c.into();
272 Arc::new(pcol)
273 }
274 ExprType::UnknownColumn(c) => Arc::new(UnKnownColumn::new(&c.name)),
275 ExprType::Literal(scalar) => Arc::new(Literal::new(scalar.try_into()?)),
276 ExprType::BinaryExpr(binary_expr) => Arc::new(BinaryExpr::new(
277 parse_required_physical_expr(
278 binary_expr.l.as_deref(),
279 ctx,
280 "left",
281 input_schema,
282 codec,
283 proto_converter,
284 )?,
285 logical_plan::from_proto::from_proto_binary_op(&binary_expr.op)?,
286 parse_required_physical_expr(
287 binary_expr.r.as_deref(),
288 ctx,
289 "right",
290 input_schema,
291 codec,
292 proto_converter,
293 )?,
294 )),
295 ExprType::AggregateExpr(_) => {
296 return not_impl_err!(
297 "Cannot convert aggregate expr node to physical expression"
298 );
299 }
300 ExprType::WindowExpr(_) => {
301 return not_impl_err!(
302 "Cannot convert window expr node to physical expression"
303 );
304 }
305 ExprType::Sort(_) => {
306 return not_impl_err!("Cannot convert sort expr node to physical expression");
307 }
308 ExprType::IsNullExpr(e) => {
309 Arc::new(IsNullExpr::new(parse_required_physical_expr(
310 e.expr.as_deref(),
311 ctx,
312 "expr",
313 input_schema,
314 codec,
315 proto_converter,
316 )?))
317 }
318 ExprType::IsNotNullExpr(e) => {
319 Arc::new(IsNotNullExpr::new(parse_required_physical_expr(
320 e.expr.as_deref(),
321 ctx,
322 "expr",
323 input_schema,
324 codec,
325 proto_converter,
326 )?))
327 }
328 ExprType::NotExpr(e) => Arc::new(NotExpr::new(parse_required_physical_expr(
329 e.expr.as_deref(),
330 ctx,
331 "expr",
332 input_schema,
333 codec,
334 proto_converter,
335 )?)),
336 ExprType::Negative(e) => {
337 Arc::new(NegativeExpr::new(parse_required_physical_expr(
338 e.expr.as_deref(),
339 ctx,
340 "expr",
341 input_schema,
342 codec,
343 proto_converter,
344 )?))
345 }
346 ExprType::InList(e) => in_list(
347 parse_required_physical_expr(
348 e.expr.as_deref(),
349 ctx,
350 "expr",
351 input_schema,
352 codec,
353 proto_converter,
354 )?,
355 parse_physical_exprs(&e.list, ctx, input_schema, codec, proto_converter)?,
356 &e.negated,
357 input_schema,
358 )?,
359 ExprType::Case(e) => Arc::new(CaseExpr::try_new(
360 e.expr
361 .as_ref()
362 .map(|e| {
363 proto_converter.proto_to_physical_expr(
364 e.as_ref(),
365 ctx,
366 input_schema,
367 codec,
368 )
369 })
370 .transpose()?,
371 e.when_then_expr
372 .iter()
373 .map(|e| {
374 Ok((
375 parse_required_physical_expr(
376 e.when_expr.as_ref(),
377 ctx,
378 "when_expr",
379 input_schema,
380 codec,
381 proto_converter,
382 )?,
383 parse_required_physical_expr(
384 e.then_expr.as_ref(),
385 ctx,
386 "then_expr",
387 input_schema,
388 codec,
389 proto_converter,
390 )?,
391 ))
392 })
393 .collect::<Result<Vec<_>>>()?,
394 e.else_expr
395 .as_ref()
396 .map(|e| {
397 proto_converter.proto_to_physical_expr(
398 e.as_ref(),
399 ctx,
400 input_schema,
401 codec,
402 )
403 })
404 .transpose()?,
405 )?),
406 ExprType::Cast(e) => Arc::new(CastExpr::new(
407 parse_required_physical_expr(
408 e.expr.as_deref(),
409 ctx,
410 "expr",
411 input_schema,
412 codec,
413 proto_converter,
414 )?,
415 convert_required!(e.arrow_type)?,
416 None,
417 )),
418 ExprType::TryCast(e) => Arc::new(TryCastExpr::new(
419 parse_required_physical_expr(
420 e.expr.as_deref(),
421 ctx,
422 "expr",
423 input_schema,
424 codec,
425 proto_converter,
426 )?,
427 convert_required!(e.arrow_type)?,
428 )),
429 ExprType::ScalarUdf(e) => {
430 let udf = match &e.fun_definition {
431 Some(buf) => codec.try_decode_udf(&e.name, buf)?,
432 None => ctx
433 .udf(e.name.as_str())
434 .or_else(|_| codec.try_decode_udf(&e.name, &[]))?,
435 };
436 let scalar_fun_def = Arc::clone(&udf);
437
438 let args =
439 parse_physical_exprs(&e.args, ctx, input_schema, codec, proto_converter)?;
440
441 let config_options = Arc::clone(ctx.session_config().options());
442
443 Arc::new(
444 ScalarFunctionExpr::new(
445 e.name.as_str(),
446 scalar_fun_def,
447 args,
448 Field::new(
449 &e.return_field_name,
450 convert_required!(e.return_type)?,
451 true,
452 )
453 .into(),
454 config_options,
455 )
456 .with_nullable(e.nullable),
457 )
458 }
459 ExprType::LikeExpr(like_expr) => Arc::new(LikeExpr::new(
460 like_expr.negated,
461 like_expr.case_insensitive,
462 parse_required_physical_expr(
463 like_expr.expr.as_deref(),
464 ctx,
465 "expr",
466 input_schema,
467 codec,
468 proto_converter,
469 )?,
470 parse_required_physical_expr(
471 like_expr.pattern.as_deref(),
472 ctx,
473 "pattern",
474 input_schema,
475 codec,
476 proto_converter,
477 )?,
478 )),
479 ExprType::HashExpr(hash_expr) => {
480 let on_columns = parse_physical_exprs(
481 &hash_expr.on_columns,
482 ctx,
483 input_schema,
484 codec,
485 proto_converter,
486 )?;
487 Arc::new(HashExpr::new(
488 on_columns,
489 SeededRandomState::with_seeds(
490 hash_expr.seed0,
491 hash_expr.seed1,
492 hash_expr.seed2,
493 hash_expr.seed3,
494 ),
495 hash_expr.description.clone(),
496 ))
497 }
498 ExprType::Extension(extension) => {
499 let inputs: Vec<Arc<dyn PhysicalExpr>> = extension
500 .inputs
501 .iter()
502 .map(|e| {
503 proto_converter.proto_to_physical_expr(e, ctx, input_schema, codec)
504 })
505 .collect::<Result<_>>()?;
506 codec.try_decode_expr(extension.expr.as_slice(), &inputs)? as _
507 }
508 };
509
510 Ok(pexpr)
511}
512
513fn parse_required_physical_expr(
514 expr: Option<&protobuf::PhysicalExprNode>,
515 ctx: &TaskContext,
516 field: &str,
517 input_schema: &Schema,
518 codec: &dyn PhysicalExtensionCodec,
519 proto_converter: &dyn PhysicalProtoConverterExtension,
520) -> Result<Arc<dyn PhysicalExpr>> {
521 expr.map(|e| proto_converter.proto_to_physical_expr(e, ctx, input_schema, codec))
522 .transpose()?
523 .ok_or_else(|| internal_datafusion_err!("Missing required field {field:?}"))
524}
525
526pub fn parse_protobuf_hash_partitioning(
527 partitioning: Option<&protobuf::PhysicalHashRepartition>,
528 ctx: &TaskContext,
529 input_schema: &Schema,
530 codec: &dyn PhysicalExtensionCodec,
531 proto_converter: &dyn PhysicalProtoConverterExtension,
532) -> Result<Option<Partitioning>> {
533 match partitioning {
534 Some(hash_part) => {
535 let expr = parse_physical_exprs(
536 &hash_part.hash_expr,
537 ctx,
538 input_schema,
539 codec,
540 proto_converter,
541 )?;
542
543 Ok(Some(Partitioning::Hash(
544 expr,
545 hash_part.partition_count.try_into().unwrap(),
546 )))
547 }
548 None => Ok(None),
549 }
550}
551
552pub fn parse_protobuf_partitioning(
553 partitioning: Option<&protobuf::Partitioning>,
554 ctx: &TaskContext,
555 input_schema: &Schema,
556 codec: &dyn PhysicalExtensionCodec,
557 proto_converter: &dyn PhysicalProtoConverterExtension,
558) -> Result<Option<Partitioning>> {
559 match partitioning {
560 Some(protobuf::Partitioning { partition_method }) => match partition_method {
561 Some(protobuf::partitioning::PartitionMethod::RoundRobin(
562 partition_count,
563 )) => Ok(Some(Partitioning::RoundRobinBatch(
564 *partition_count as usize,
565 ))),
566 Some(protobuf::partitioning::PartitionMethod::Hash(hash_repartition)) => {
567 parse_protobuf_hash_partitioning(
568 Some(hash_repartition),
569 ctx,
570 input_schema,
571 codec,
572 proto_converter,
573 )
574 }
575 Some(protobuf::partitioning::PartitionMethod::Unknown(partition_count)) => {
576 Ok(Some(Partitioning::UnknownPartitioning(
577 *partition_count as usize,
578 )))
579 }
580 None => Ok(None),
581 },
582 None => Ok(None),
583 }
584}
585
586pub fn parse_protobuf_file_scan_schema(
587 proto: &protobuf::FileScanExecConf,
588) -> Result<Arc<Schema>> {
589 Ok(Arc::new(convert_required!(proto.schema)?))
590}
591
592pub fn parse_table_schema_from_proto(
594 proto: &protobuf::FileScanExecConf,
595) -> Result<TableSchema> {
596 let schema: Arc<Schema> = parse_protobuf_file_scan_schema(proto)?;
597
598 let table_partition_cols = proto
600 .table_partition_cols
601 .iter()
602 .map(|col| Ok(Arc::new(schema.field_with_name(col)?.clone())))
603 .collect::<Result<Vec<_>>>()?;
604
605 let file_schema = Arc::new(
609 Schema::new(
610 schema
611 .fields()
612 .iter()
613 .filter(|field| !table_partition_cols.contains(field))
614 .cloned()
615 .collect::<Vec<_>>(),
616 )
617 .with_metadata(schema.metadata.clone()),
618 );
619
620 Ok(TableSchema::new(file_schema, table_partition_cols))
621}
622
623pub fn parse_protobuf_file_scan_config(
624 proto: &protobuf::FileScanExecConf,
625 ctx: &TaskContext,
626 codec: &dyn PhysicalExtensionCodec,
627 proto_converter: &dyn PhysicalProtoConverterExtension,
628 file_source: Arc<dyn FileSource>,
629) -> Result<FileScanConfig> {
630 let schema: Arc<Schema> = parse_protobuf_file_scan_schema(proto)?;
631
632 let constraints = convert_required!(proto.constraints)?;
633 let statistics = convert_required!(proto.statistics)?;
634
635 let file_groups = proto
636 .file_groups
637 .iter()
638 .map(|f| f.try_into())
639 .collect::<Result<Vec<_>, _>>()?;
640
641 let object_store_url = match proto.object_store_url.is_empty() {
642 false => ObjectStoreUrl::parse(&proto.object_store_url)?,
643 true => ObjectStoreUrl::local_filesystem(),
644 };
645
646 let mut output_ordering = vec![];
647 for node_collection in &proto.output_ordering {
648 let sort_exprs = parse_physical_sort_exprs(
649 &node_collection.physical_sort_expr_nodes,
650 ctx,
651 &schema,
652 codec,
653 proto_converter,
654 )?;
655 output_ordering.extend(LexOrdering::new(sort_exprs));
656 }
657
658 let file_source = if let Some(proto_projection_exprs) = &proto.projection_exprs {
660 let projection_exprs: Vec<ProjectionExpr> = proto_projection_exprs
661 .projections
662 .iter()
663 .map(|proto_expr| {
664 let expr = proto_converter.proto_to_physical_expr(
665 proto_expr.expr.as_ref().ok_or_else(|| {
666 internal_datafusion_err!("ProjectionExpr missing expr field")
667 })?,
668 ctx,
669 &schema,
670 codec,
671 )?;
672 Ok(ProjectionExpr::new(expr, proto_expr.alias.clone()))
673 })
674 .collect::<Result<Vec<_>>>()?;
675
676 let projection_exprs = ProjectionExprs::new(projection_exprs);
677
678 file_source
680 .try_pushdown_projection(&projection_exprs)?
681 .unwrap_or(file_source)
682 } else {
683 file_source
684 };
685
686 let config = FileScanConfigBuilder::new(object_store_url, file_source)
687 .with_file_groups(file_groups)
688 .with_constraints(constraints)
689 .with_statistics(statistics)
690 .with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize))
691 .with_output_ordering(output_ordering)
692 .with_batch_size(proto.batch_size.map(|s| s as usize))
693 .build();
694 Ok(config)
695}
696
697pub fn parse_record_batches(buf: &[u8]) -> Result<Vec<RecordBatch>> {
698 if buf.is_empty() {
699 return Ok(vec![]);
700 }
701 let reader = StreamReader::try_new(buf, None)?;
702 let mut batches = Vec::new();
703 for batch in reader {
704 batches.push(batch?);
705 }
706 Ok(batches)
707}
708
709impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile {
710 type Error = DataFusionError;
711
712 fn try_from(val: &protobuf::PartitionedFile) -> Result<Self, Self::Error> {
713 let mut pf = PartitionedFile::new_from_meta(ObjectMeta {
714 location: Path::parse(val.path.as_str())
715 .map_err(|e| proto_error(format!("Invalid object_store path: {e}")))?,
716 last_modified: Utc.timestamp_nanos(val.last_modified_ns as i64),
717 size: val.size,
718 e_tag: None,
719 version: None,
720 })
721 .with_partition_values(
722 val.partition_values
723 .iter()
724 .map(|v| v.try_into())
725 .collect::<Result<Vec<_>, _>>()?,
726 );
727 if let Some(range) = val.range.as_ref() {
728 let file_range: FileRange = range.try_into()?;
729 pf = pf.with_range(file_range.start, file_range.end);
730 }
731 if let Some(proto_stats) = val.statistics.as_ref() {
732 pf = pf.with_statistics(Arc::new(proto_stats.try_into()?));
733 }
734 Ok(pf)
735 }
736}
737
738impl TryFrom<&protobuf::FileRange> for FileRange {
739 type Error = DataFusionError;
740
741 fn try_from(value: &protobuf::FileRange) -> Result<Self, Self::Error> {
742 Ok(FileRange {
743 start: value.start,
744 end: value.end,
745 })
746 }
747}
748
749impl TryFrom<&protobuf::FileGroup> for FileGroup {
750 type Error = DataFusionError;
751
752 fn try_from(val: &protobuf::FileGroup) -> Result<Self, Self::Error> {
753 let files = val
754 .files
755 .iter()
756 .map(|f| f.try_into())
757 .collect::<Result<Vec<_>, _>>()?;
758 Ok(FileGroup::new(files))
759 }
760}
761
762impl TryFrom<&protobuf::JsonSink> for JsonSink {
763 type Error = DataFusionError;
764
765 fn try_from(value: &protobuf::JsonSink) -> Result<Self, Self::Error> {
766 Ok(Self::new(
767 convert_required!(value.config)?,
768 convert_required!(value.writer_options)?,
769 ))
770 }
771}
772
773#[cfg(feature = "parquet")]
774impl TryFrom<&protobuf::ParquetSink> for ParquetSink {
775 type Error = DataFusionError;
776
777 fn try_from(value: &protobuf::ParquetSink) -> Result<Self, Self::Error> {
778 Ok(Self::new(
779 convert_required!(value.config)?,
780 convert_required!(value.parquet_options)?,
781 ))
782 }
783}
784
785impl TryFrom<&protobuf::CsvSink> for CsvSink {
786 type Error = DataFusionError;
787
788 fn try_from(value: &protobuf::CsvSink) -> Result<Self, Self::Error> {
789 Ok(Self::new(
790 convert_required!(value.config)?,
791 convert_required!(value.writer_options)?,
792 ))
793 }
794}
795
796impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig {
797 type Error = DataFusionError;
798
799 fn try_from(conf: &protobuf::FileSinkConfig) -> Result<Self, Self::Error> {
800 let file_group = FileGroup::new(
801 conf.file_groups
802 .iter()
803 .map(|f| f.try_into())
804 .collect::<Result<Vec<_>>>()?,
805 );
806 let table_paths = conf
807 .table_paths
808 .iter()
809 .map(ListingTableUrl::parse)
810 .collect::<Result<Vec<_>>>()?;
811 let table_partition_cols = conf
812 .table_partition_cols
813 .iter()
814 .map(|protobuf::PartitionColumn { name, arrow_type }| {
815 let data_type = convert_required!(arrow_type)?;
816 Ok((name.clone(), data_type))
817 })
818 .collect::<Result<Vec<_>>>()?;
819 let insert_op = match conf.insert_op() {
820 protobuf::InsertOp::Append => InsertOp::Append,
821 protobuf::InsertOp::Overwrite => InsertOp::Overwrite,
822 protobuf::InsertOp::Replace => InsertOp::Replace,
823 };
824 let file_output_mode = match conf.file_output_mode() {
825 protobuf::FileOutputMode::Automatic => {
826 datafusion_datasource::file_sink_config::FileOutputMode::Automatic
827 }
828 protobuf::FileOutputMode::SingleFile => {
829 datafusion_datasource::file_sink_config::FileOutputMode::SingleFile
830 }
831 protobuf::FileOutputMode::Directory => {
832 datafusion_datasource::file_sink_config::FileOutputMode::Directory
833 }
834 };
835 Ok(Self {
836 original_url: String::default(),
837 object_store_url: ObjectStoreUrl::parse(&conf.object_store_url)?,
838 file_group,
839 table_paths,
840 output_schema: Arc::new(convert_required!(conf.output_schema)?),
841 table_partition_cols,
842 insert_op,
843 keep_partition_by_columns: conf.keep_partition_by_columns,
844 file_extension: conf.file_extension.clone(),
845 file_output_mode,
846 })
847 }
848}
849
850#[cfg(test)]
851mod tests {
852 use chrono::{TimeZone, Utc};
853 use datafusion_datasource::PartitionedFile;
854 use object_store::ObjectMeta;
855 use object_store::path::Path;
856
857 use super::*;
858
859 #[test]
860 fn partitioned_file_path_roundtrip_percent_encoded() {
861 let path_str = "foo/foo%2Fbar/baz%252Fqux";
862 let pf = PartitionedFile::new_from_meta(ObjectMeta {
863 location: Path::parse(path_str).unwrap(),
864 last_modified: Utc.timestamp_nanos(1_000),
865 size: 42,
866 e_tag: None,
867 version: None,
868 });
869
870 let proto = protobuf::PartitionedFile::try_from(&pf).unwrap();
871 assert_eq!(proto.path, path_str);
872
873 let pf2 = PartitionedFile::try_from(&proto).unwrap();
874 assert_eq!(pf2.object_meta.location.as_ref(), path_str);
875 assert_eq!(pf2.object_meta.location, pf.object_meta.location);
876 assert_eq!(pf2.object_meta.size, pf.object_meta.size);
877 assert_eq!(pf2.object_meta.last_modified, pf.object_meta.last_modified);
878 }
879
880 #[test]
881 fn partitioned_file_from_proto_invalid_path() {
882 let proto = protobuf::PartitionedFile {
883 path: "foo//bar".to_string(),
884 size: 1,
885 last_modified_ns: 0,
886 partition_values: vec![],
887 range: None,
888 statistics: None,
889 };
890
891 let err = PartitionedFile::try_from(&proto).unwrap_err();
892 assert!(err.to_string().contains("Invalid object_store path"));
893 }
894}