1use std::sync::Arc;
21
22use arrow::array::RecordBatch;
23use arrow::compute::SortOptions;
24use arrow::datatypes::Field;
25use arrow::ipc::reader::StreamReader;
26use chrono::{TimeZone, Utc};
27use datafusion_expr::dml::InsertOp;
28use object_store::path::Path;
29use object_store::ObjectMeta;
30
31use arrow::datatypes::Schema;
32use datafusion_common::{internal_datafusion_err, not_impl_err, DataFusionError, Result};
33use datafusion_datasource::file::FileSource;
34use datafusion_datasource::file_groups::FileGroup;
35use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
36use datafusion_datasource::file_sink_config::FileSinkConfig;
37use datafusion_datasource::{FileRange, ListingTableUrl, PartitionedFile};
38use datafusion_datasource_csv::file_format::CsvSink;
39use datafusion_datasource_json::file_format::JsonSink;
40#[cfg(feature = "parquet")]
41use datafusion_datasource_parquet::file_format::ParquetSink;
42use datafusion_execution::object_store::ObjectStoreUrl;
43use datafusion_execution::{FunctionRegistry, TaskContext};
44use datafusion_expr::WindowFunctionDefinition;
45use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr};
46use datafusion_physical_plan::expressions::{
47 in_list, BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr, LikeExpr,
48 Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
49};
50use datafusion_physical_plan::windows::{create_window_expr, schema_add_window_field};
51use datafusion_physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
52use datafusion_proto_common::common::proto_error;
53
54use crate::convert_required;
55use crate::logical_plan::{self};
56use crate::protobuf;
57use crate::protobuf::physical_expr_node::ExprType;
58
59use super::PhysicalExtensionCodec;
60
61impl From<&protobuf::PhysicalColumn> for Column {
62 fn from(c: &protobuf::PhysicalColumn) -> Column {
63 Column::new(&c.name, c.index as usize)
64 }
65}
66
67pub fn parse_physical_sort_expr(
77 proto: &protobuf::PhysicalSortExprNode,
78 ctx: &TaskContext,
79 input_schema: &Schema,
80 codec: &dyn PhysicalExtensionCodec,
81) -> Result<PhysicalSortExpr> {
82 if let Some(expr) = &proto.expr {
83 let expr = parse_physical_expr(expr.as_ref(), ctx, input_schema, codec)?;
84 let options = SortOptions {
85 descending: !proto.asc,
86 nulls_first: proto.nulls_first,
87 };
88 Ok(PhysicalSortExpr { expr, options })
89 } else {
90 Err(proto_error("Unexpected empty physical expression"))
91 }
92}
93
94pub fn parse_physical_sort_exprs(
104 proto: &[protobuf::PhysicalSortExprNode],
105 ctx: &TaskContext,
106 input_schema: &Schema,
107 codec: &dyn PhysicalExtensionCodec,
108) -> Result<Vec<PhysicalSortExpr>> {
109 proto
110 .iter()
111 .map(|sort_expr| parse_physical_sort_expr(sort_expr, ctx, input_schema, codec))
112 .collect()
113}
114
115pub fn parse_physical_window_expr(
126 proto: &protobuf::PhysicalWindowExprNode,
127 ctx: &TaskContext,
128 input_schema: &Schema,
129 codec: &dyn PhysicalExtensionCodec,
130) -> Result<Arc<dyn WindowExpr>> {
131 let window_node_expr = parse_physical_exprs(&proto.args, ctx, input_schema, codec)?;
132 let partition_by =
133 parse_physical_exprs(&proto.partition_by, ctx, input_schema, codec)?;
134
135 let order_by = parse_physical_sort_exprs(&proto.order_by, ctx, input_schema, codec)?;
136
137 let window_frame = proto
138 .window_frame
139 .as_ref()
140 .map(|wf| wf.clone().try_into())
141 .transpose()
142 .map_err(|e| internal_datafusion_err!("{e}"))?
143 .ok_or_else(|| {
144 internal_datafusion_err!("Missing required field 'window_frame' in protobuf")
145 })?;
146
147 let fun = if let Some(window_func) = proto.window_function.as_ref() {
148 match window_func {
149 protobuf::physical_window_expr_node::WindowFunction::UserDefinedAggrFunction(udaf_name) => {
150 WindowFunctionDefinition::AggregateUDF(match &proto.fun_definition {
151 Some(buf) => codec.try_decode_udaf(udaf_name, buf)?,
152 None => ctx.udaf(udaf_name).or_else(|_| codec.try_decode_udaf(udaf_name, &[]))?,
153 })
154 }
155 protobuf::physical_window_expr_node::WindowFunction::UserDefinedWindowFunction(udwf_name) => {
156 WindowFunctionDefinition::WindowUDF(match &proto.fun_definition {
157 Some(buf) => codec.try_decode_udwf(udwf_name, buf)?,
158 None => ctx.udwf(udwf_name).or_else(|_| codec.try_decode_udwf(udwf_name, &[]))?
159 })
160 }
161 }
162 } else {
163 return Err(proto_error("Missing required field in protobuf"));
164 };
165
166 let name = proto.name.clone();
167 let extended_schema =
169 schema_add_window_field(&window_node_expr, input_schema, &fun, &name)?;
170 create_window_expr(
171 &fun,
172 name,
173 &window_node_expr,
174 &partition_by,
175 &order_by,
176 Arc::new(window_frame),
177 extended_schema,
178 proto.ignore_nulls,
179 proto.distinct,
180 None,
181 )
182}
183
184pub fn parse_physical_exprs<'a, I>(
185 protos: I,
186 ctx: &TaskContext,
187 input_schema: &Schema,
188 codec: &dyn PhysicalExtensionCodec,
189) -> Result<Vec<Arc<dyn PhysicalExpr>>>
190where
191 I: IntoIterator<Item = &'a protobuf::PhysicalExprNode>,
192{
193 protos
194 .into_iter()
195 .map(|p| parse_physical_expr(p, ctx, input_schema, codec))
196 .collect::<Result<Vec<_>>>()
197}
198
199pub fn parse_physical_expr(
209 proto: &protobuf::PhysicalExprNode,
210 ctx: &TaskContext,
211 input_schema: &Schema,
212 codec: &dyn PhysicalExtensionCodec,
213) -> Result<Arc<dyn PhysicalExpr>> {
214 let expr_type = proto
215 .expr_type
216 .as_ref()
217 .ok_or_else(|| proto_error("Unexpected empty physical expression"))?;
218
219 let pexpr: Arc<dyn PhysicalExpr> = match expr_type {
220 ExprType::Column(c) => {
221 let pcol: Column = c.into();
222 Arc::new(pcol)
223 }
224 ExprType::UnknownColumn(c) => Arc::new(UnKnownColumn::new(&c.name)),
225 ExprType::Literal(scalar) => Arc::new(Literal::new(scalar.try_into()?)),
226 ExprType::BinaryExpr(binary_expr) => Arc::new(BinaryExpr::new(
227 parse_required_physical_expr(
228 binary_expr.l.as_deref(),
229 ctx,
230 "left",
231 input_schema,
232 codec,
233 )?,
234 logical_plan::from_proto::from_proto_binary_op(&binary_expr.op)?,
235 parse_required_physical_expr(
236 binary_expr.r.as_deref(),
237 ctx,
238 "right",
239 input_schema,
240 codec,
241 )?,
242 )),
243 ExprType::AggregateExpr(_) => {
244 return not_impl_err!(
245 "Cannot convert aggregate expr node to physical expression"
246 );
247 }
248 ExprType::WindowExpr(_) => {
249 return not_impl_err!(
250 "Cannot convert window expr node to physical expression"
251 );
252 }
253 ExprType::Sort(_) => {
254 return not_impl_err!("Cannot convert sort expr node to physical expression");
255 }
256 ExprType::IsNullExpr(e) => {
257 Arc::new(IsNullExpr::new(parse_required_physical_expr(
258 e.expr.as_deref(),
259 ctx,
260 "expr",
261 input_schema,
262 codec,
263 )?))
264 }
265 ExprType::IsNotNullExpr(e) => {
266 Arc::new(IsNotNullExpr::new(parse_required_physical_expr(
267 e.expr.as_deref(),
268 ctx,
269 "expr",
270 input_schema,
271 codec,
272 )?))
273 }
274 ExprType::NotExpr(e) => Arc::new(NotExpr::new(parse_required_physical_expr(
275 e.expr.as_deref(),
276 ctx,
277 "expr",
278 input_schema,
279 codec,
280 )?)),
281 ExprType::Negative(e) => {
282 Arc::new(NegativeExpr::new(parse_required_physical_expr(
283 e.expr.as_deref(),
284 ctx,
285 "expr",
286 input_schema,
287 codec,
288 )?))
289 }
290 ExprType::InList(e) => in_list(
291 parse_required_physical_expr(
292 e.expr.as_deref(),
293 ctx,
294 "expr",
295 input_schema,
296 codec,
297 )?,
298 parse_physical_exprs(&e.list, ctx, input_schema, codec)?,
299 &e.negated,
300 input_schema,
301 )?,
302 ExprType::Case(e) => Arc::new(CaseExpr::try_new(
303 e.expr
304 .as_ref()
305 .map(|e| parse_physical_expr(e.as_ref(), ctx, input_schema, codec))
306 .transpose()?,
307 e.when_then_expr
308 .iter()
309 .map(|e| {
310 Ok((
311 parse_required_physical_expr(
312 e.when_expr.as_ref(),
313 ctx,
314 "when_expr",
315 input_schema,
316 codec,
317 )?,
318 parse_required_physical_expr(
319 e.then_expr.as_ref(),
320 ctx,
321 "then_expr",
322 input_schema,
323 codec,
324 )?,
325 ))
326 })
327 .collect::<Result<Vec<_>>>()?,
328 e.else_expr
329 .as_ref()
330 .map(|e| parse_physical_expr(e.as_ref(), ctx, input_schema, codec))
331 .transpose()?,
332 )?),
333 ExprType::Cast(e) => Arc::new(CastExpr::new(
334 parse_required_physical_expr(
335 e.expr.as_deref(),
336 ctx,
337 "expr",
338 input_schema,
339 codec,
340 )?,
341 convert_required!(e.arrow_type)?,
342 None,
343 )),
344 ExprType::TryCast(e) => Arc::new(TryCastExpr::new(
345 parse_required_physical_expr(
346 e.expr.as_deref(),
347 ctx,
348 "expr",
349 input_schema,
350 codec,
351 )?,
352 convert_required!(e.arrow_type)?,
353 )),
354 ExprType::ScalarUdf(e) => {
355 let udf = match &e.fun_definition {
356 Some(buf) => codec.try_decode_udf(&e.name, buf)?,
357 None => ctx
358 .udf(e.name.as_str())
359 .or_else(|_| codec.try_decode_udf(&e.name, &[]))?,
360 };
361 let scalar_fun_def = Arc::clone(&udf);
362
363 let args = parse_physical_exprs(&e.args, ctx, input_schema, codec)?;
364
365 let config_options = Arc::clone(ctx.session_config().options());
366
367 Arc::new(
368 ScalarFunctionExpr::new(
369 e.name.as_str(),
370 scalar_fun_def,
371 args,
372 Field::new(
373 &e.return_field_name,
374 convert_required!(e.return_type)?,
375 true,
376 )
377 .into(),
378 config_options,
379 )
380 .with_nullable(e.nullable),
381 )
382 }
383 ExprType::LikeExpr(like_expr) => Arc::new(LikeExpr::new(
384 like_expr.negated,
385 like_expr.case_insensitive,
386 parse_required_physical_expr(
387 like_expr.expr.as_deref(),
388 ctx,
389 "expr",
390 input_schema,
391 codec,
392 )?,
393 parse_required_physical_expr(
394 like_expr.pattern.as_deref(),
395 ctx,
396 "pattern",
397 input_schema,
398 codec,
399 )?,
400 )),
401 ExprType::Extension(extension) => {
402 let inputs: Vec<Arc<dyn PhysicalExpr>> = extension
403 .inputs
404 .iter()
405 .map(|e| parse_physical_expr(e, ctx, input_schema, codec))
406 .collect::<Result<_>>()?;
407 (codec.try_decode_expr(extension.expr.as_slice(), &inputs)?) as _
408 }
409 };
410
411 Ok(pexpr)
412}
413
414fn parse_required_physical_expr(
415 expr: Option<&protobuf::PhysicalExprNode>,
416 ctx: &TaskContext,
417 field: &str,
418 input_schema: &Schema,
419 codec: &dyn PhysicalExtensionCodec,
420) -> Result<Arc<dyn PhysicalExpr>> {
421 expr.map(|e| parse_physical_expr(e, ctx, input_schema, codec))
422 .transpose()?
423 .ok_or_else(|| internal_datafusion_err!("Missing required field {field:?}"))
424}
425
426pub fn parse_protobuf_hash_partitioning(
427 partitioning: Option<&protobuf::PhysicalHashRepartition>,
428 ctx: &TaskContext,
429 input_schema: &Schema,
430 codec: &dyn PhysicalExtensionCodec,
431) -> Result<Option<Partitioning>> {
432 match partitioning {
433 Some(hash_part) => {
434 let expr =
435 parse_physical_exprs(&hash_part.hash_expr, ctx, input_schema, codec)?;
436
437 Ok(Some(Partitioning::Hash(
438 expr,
439 hash_part.partition_count.try_into().unwrap(),
440 )))
441 }
442 None => Ok(None),
443 }
444}
445
446pub fn parse_protobuf_partitioning(
447 partitioning: Option<&protobuf::Partitioning>,
448 ctx: &TaskContext,
449 input_schema: &Schema,
450 codec: &dyn PhysicalExtensionCodec,
451) -> Result<Option<Partitioning>> {
452 match partitioning {
453 Some(protobuf::Partitioning { partition_method }) => match partition_method {
454 Some(protobuf::partitioning::PartitionMethod::RoundRobin(
455 partition_count,
456 )) => Ok(Some(Partitioning::RoundRobinBatch(
457 *partition_count as usize,
458 ))),
459 Some(protobuf::partitioning::PartitionMethod::Hash(hash_repartition)) => {
460 parse_protobuf_hash_partitioning(
461 Some(hash_repartition),
462 ctx,
463 input_schema,
464 codec,
465 )
466 }
467 Some(protobuf::partitioning::PartitionMethod::Unknown(partition_count)) => {
468 Ok(Some(Partitioning::UnknownPartitioning(
469 *partition_count as usize,
470 )))
471 }
472 None => Ok(None),
473 },
474 None => Ok(None),
475 }
476}
477
478pub fn parse_protobuf_file_scan_schema(
479 proto: &protobuf::FileScanExecConf,
480) -> Result<Arc<Schema>> {
481 Ok(Arc::new(convert_required!(proto.schema)?))
482}
483
484pub fn parse_protobuf_file_scan_config(
485 proto: &protobuf::FileScanExecConf,
486 ctx: &TaskContext,
487 codec: &dyn PhysicalExtensionCodec,
488 file_source: Arc<dyn FileSource>,
489) -> Result<FileScanConfig> {
490 let schema: Arc<Schema> = parse_protobuf_file_scan_schema(proto)?;
491 let projection = proto
492 .projection
493 .iter()
494 .map(|i| *i as usize)
495 .collect::<Vec<_>>();
496
497 let constraints = convert_required!(proto.constraints)?;
498 let statistics = convert_required!(proto.statistics)?;
499
500 let file_groups = proto
501 .file_groups
502 .iter()
503 .map(|f| f.try_into())
504 .collect::<Result<Vec<_>, _>>()?;
505
506 let object_store_url = match proto.object_store_url.is_empty() {
507 false => ObjectStoreUrl::parse(&proto.object_store_url)?,
508 true => ObjectStoreUrl::local_filesystem(),
509 };
510
511 let table_partition_cols = proto
513 .table_partition_cols
514 .iter()
515 .map(|col| Ok(schema.field_with_name(col)?.clone()))
516 .collect::<Result<Vec<_>>>()?;
517
518 let file_schema = Arc::new(
522 Schema::new(
523 schema
524 .fields()
525 .iter()
526 .filter(|field| !table_partition_cols.contains(field))
527 .cloned()
528 .collect::<Vec<_>>(),
529 )
530 .with_metadata(schema.metadata.clone()),
531 );
532
533 let mut output_ordering = vec![];
534 for node_collection in &proto.output_ordering {
535 let sort_exprs = parse_physical_sort_exprs(
536 &node_collection.physical_sort_expr_nodes,
537 ctx,
538 &schema,
539 codec,
540 )?;
541 output_ordering.extend(LexOrdering::new(sort_exprs));
542 }
543
544 let config = FileScanConfigBuilder::new(object_store_url, file_schema, file_source)
545 .with_file_groups(file_groups)
546 .with_constraints(constraints)
547 .with_statistics(statistics)
548 .with_projection_indices(Some(projection))
549 .with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize))
550 .with_table_partition_cols(table_partition_cols)
551 .with_output_ordering(output_ordering)
552 .with_batch_size(proto.batch_size.map(|s| s as usize))
553 .build();
554 Ok(config)
555}
556
557pub fn parse_record_batches(buf: &[u8]) -> Result<Vec<RecordBatch>> {
558 if buf.is_empty() {
559 return Ok(vec![]);
560 }
561 let reader = StreamReader::try_new(buf, None)?;
562 let mut batches = Vec::new();
563 for batch in reader {
564 batches.push(batch?);
565 }
566 Ok(batches)
567}
568
569impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile {
570 type Error = DataFusionError;
571
572 fn try_from(val: &protobuf::PartitionedFile) -> Result<Self, Self::Error> {
573 Ok(PartitionedFile {
574 object_meta: ObjectMeta {
575 location: Path::parse(val.path.as_str()).map_err(|e| {
576 proto_error(format!("Invalid object_store path: {e}"))
577 })?,
578 last_modified: Utc.timestamp_nanos(val.last_modified_ns as i64),
579 size: val.size,
580 e_tag: None,
581 version: None,
582 },
583 partition_values: val
584 .partition_values
585 .iter()
586 .map(|v| v.try_into())
587 .collect::<Result<Vec<_>, _>>()?,
588 range: val.range.as_ref().map(|v| v.try_into()).transpose()?,
589 statistics: val
590 .statistics
591 .as_ref()
592 .map(|v| v.try_into().map(Arc::new))
593 .transpose()?,
594 extensions: None,
595 metadata_size_hint: None,
596 })
597 }
598}
599
600impl TryFrom<&protobuf::FileRange> for FileRange {
601 type Error = DataFusionError;
602
603 fn try_from(value: &protobuf::FileRange) -> Result<Self, Self::Error> {
604 Ok(FileRange {
605 start: value.start,
606 end: value.end,
607 })
608 }
609}
610
611impl TryFrom<&protobuf::FileGroup> for FileGroup {
612 type Error = DataFusionError;
613
614 fn try_from(val: &protobuf::FileGroup) -> Result<Self, Self::Error> {
615 let files = val
616 .files
617 .iter()
618 .map(|f| f.try_into())
619 .collect::<Result<Vec<_>, _>>()?;
620 Ok(FileGroup::new(files))
621 }
622}
623
624impl TryFrom<&protobuf::JsonSink> for JsonSink {
625 type Error = DataFusionError;
626
627 fn try_from(value: &protobuf::JsonSink) -> Result<Self, Self::Error> {
628 Ok(Self::new(
629 convert_required!(value.config)?,
630 convert_required!(value.writer_options)?,
631 ))
632 }
633}
634
635#[cfg(feature = "parquet")]
636impl TryFrom<&protobuf::ParquetSink> for ParquetSink {
637 type Error = DataFusionError;
638
639 fn try_from(value: &protobuf::ParquetSink) -> Result<Self, Self::Error> {
640 Ok(Self::new(
641 convert_required!(value.config)?,
642 convert_required!(value.parquet_options)?,
643 ))
644 }
645}
646
647impl TryFrom<&protobuf::CsvSink> for CsvSink {
648 type Error = DataFusionError;
649
650 fn try_from(value: &protobuf::CsvSink) -> Result<Self, Self::Error> {
651 Ok(Self::new(
652 convert_required!(value.config)?,
653 convert_required!(value.writer_options)?,
654 ))
655 }
656}
657
658impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig {
659 type Error = DataFusionError;
660
661 fn try_from(conf: &protobuf::FileSinkConfig) -> Result<Self, Self::Error> {
662 let file_group = FileGroup::new(
663 conf.file_groups
664 .iter()
665 .map(|f| f.try_into())
666 .collect::<Result<Vec<_>>>()?,
667 );
668 let table_paths = conf
669 .table_paths
670 .iter()
671 .map(ListingTableUrl::parse)
672 .collect::<Result<Vec<_>>>()?;
673 let table_partition_cols = conf
674 .table_partition_cols
675 .iter()
676 .map(|protobuf::PartitionColumn { name, arrow_type }| {
677 let data_type = convert_required!(arrow_type)?;
678 Ok((name.clone(), data_type))
679 })
680 .collect::<Result<Vec<_>>>()?;
681 let insert_op = match conf.insert_op() {
682 protobuf::InsertOp::Append => InsertOp::Append,
683 protobuf::InsertOp::Overwrite => InsertOp::Overwrite,
684 protobuf::InsertOp::Replace => InsertOp::Replace,
685 };
686 Ok(Self {
687 original_url: String::default(),
688 object_store_url: ObjectStoreUrl::parse(&conf.object_store_url)?,
689 file_group,
690 table_paths,
691 output_schema: Arc::new(convert_required!(conf.output_schema)?),
692 table_partition_cols,
693 insert_op,
694 keep_partition_by_columns: conf.keep_partition_by_columns,
695 file_extension: conf.file_extension.clone(),
696 })
697 }
698}
699
700#[cfg(test)]
701mod tests {
702 use super::*;
703 use chrono::{TimeZone, Utc};
704 use datafusion_datasource::PartitionedFile;
705 use object_store::path::Path;
706 use object_store::ObjectMeta;
707
708 #[test]
709 fn partitioned_file_path_roundtrip_percent_encoded() {
710 let path_str = "foo/foo%2Fbar/baz%252Fqux";
711 let pf = PartitionedFile {
712 object_meta: ObjectMeta {
713 location: Path::parse(path_str).unwrap(),
714 last_modified: Utc.timestamp_nanos(1_000),
715 size: 42,
716 e_tag: None,
717 version: None,
718 },
719 partition_values: vec![],
720 range: None,
721 statistics: None,
722 extensions: None,
723 metadata_size_hint: None,
724 };
725
726 let proto = protobuf::PartitionedFile::try_from(&pf).unwrap();
727 assert_eq!(proto.path, path_str);
728
729 let pf2 = PartitionedFile::try_from(&proto).unwrap();
730 assert_eq!(pf2.object_meta.location.as_ref(), path_str);
731 assert_eq!(pf2.object_meta.location, pf.object_meta.location);
732 assert_eq!(pf2.object_meta.size, pf.object_meta.size);
733 assert_eq!(pf2.object_meta.last_modified, pf.object_meta.last_modified);
734 }
735
736 #[test]
737 fn partitioned_file_from_proto_invalid_path() {
738 let proto = protobuf::PartitionedFile {
739 path: "foo//bar".to_string(),
740 size: 1,
741 last_modified_ns: 0,
742 partition_values: vec![],
743 range: None,
744 statistics: None,
745 };
746
747 let err = PartitionedFile::try_from(&proto).unwrap_err();
748 assert!(err.to_string().contains("Invalid object_store path"));
749 }
750}