1use std::sync::Arc;
21
22use arrow::compute::SortOptions;
23use arrow::datatypes::Field;
24use chrono::{TimeZone, Utc};
25use datafusion_expr::dml::InsertOp;
26use object_store::path::Path;
27use object_store::ObjectMeta;
28
29use datafusion::arrow::datatypes::Schema;
30use datafusion::datasource::file_format::csv::CsvSink;
31use datafusion::datasource::file_format::json::JsonSink;
32#[cfg(feature = "parquet")]
33use datafusion::datasource::file_format::parquet::ParquetSink;
34use datafusion::datasource::listing::{FileRange, ListingTableUrl, PartitionedFile};
35use datafusion::datasource::object_store::ObjectStoreUrl;
36use datafusion::datasource::physical_plan::{
37 FileGroup, FileScanConfig, FileScanConfigBuilder, FileSinkConfig, FileSource,
38};
39use datafusion::execution::FunctionRegistry;
40use datafusion::logical_expr::WindowFunctionDefinition;
41use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr};
42use datafusion::physical_plan::expressions::{
43 in_list, BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr, LikeExpr,
44 Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
45};
46use datafusion::physical_plan::windows::{create_window_expr, schema_add_window_field};
47use datafusion::physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
48use datafusion_common::{not_impl_err, DataFusionError, Result};
49use datafusion_proto_common::common::proto_error;
50
51use crate::convert_required;
52use crate::logical_plan::{self};
53use crate::protobuf;
54use crate::protobuf::physical_expr_node::ExprType;
55
56use super::PhysicalExtensionCodec;
57
58impl From<&protobuf::PhysicalColumn> for Column {
59 fn from(c: &protobuf::PhysicalColumn) -> Column {
60 Column::new(&c.name, c.index as usize)
61 }
62}
63
64pub fn parse_physical_sort_expr(
74 proto: &protobuf::PhysicalSortExprNode,
75 registry: &dyn FunctionRegistry,
76 input_schema: &Schema,
77 codec: &dyn PhysicalExtensionCodec,
78) -> Result<PhysicalSortExpr> {
79 if let Some(expr) = &proto.expr {
80 let expr = parse_physical_expr(expr.as_ref(), registry, input_schema, codec)?;
81 let options = SortOptions {
82 descending: !proto.asc,
83 nulls_first: proto.nulls_first,
84 };
85 Ok(PhysicalSortExpr { expr, options })
86 } else {
87 Err(proto_error("Unexpected empty physical expression"))
88 }
89}
90
91pub fn parse_physical_sort_exprs(
101 proto: &[protobuf::PhysicalSortExprNode],
102 registry: &dyn FunctionRegistry,
103 input_schema: &Schema,
104 codec: &dyn PhysicalExtensionCodec,
105) -> Result<LexOrdering> {
106 proto
107 .iter()
108 .map(|sort_expr| {
109 parse_physical_sort_expr(sort_expr, registry, input_schema, codec)
110 })
111 .collect::<Result<LexOrdering>>()
112}
113
114pub fn parse_physical_window_expr(
125 proto: &protobuf::PhysicalWindowExprNode,
126 registry: &dyn FunctionRegistry,
127 input_schema: &Schema,
128 codec: &dyn PhysicalExtensionCodec,
129) -> Result<Arc<dyn WindowExpr>> {
130 let window_node_expr =
131 parse_physical_exprs(&proto.args, registry, input_schema, codec)?;
132 let partition_by =
133 parse_physical_exprs(&proto.partition_by, registry, input_schema, codec)?;
134
135 let order_by =
136 parse_physical_sort_exprs(&proto.order_by, registry, input_schema, codec)?;
137
138 let window_frame = proto
139 .window_frame
140 .as_ref()
141 .map(|wf| wf.clone().try_into())
142 .transpose()
143 .map_err(|e| DataFusionError::Internal(format!("{e}")))?
144 .ok_or_else(|| {
145 DataFusionError::Internal(
146 "Missing required field 'window_frame' in protobuf".to_string(),
147 )
148 })?;
149
150 let fun = if let Some(window_func) = proto.window_function.as_ref() {
151 match window_func {
152 protobuf::physical_window_expr_node::WindowFunction::UserDefinedAggrFunction(udaf_name) => {
153 WindowFunctionDefinition::AggregateUDF(match &proto.fun_definition {
154 Some(buf) => codec.try_decode_udaf(udaf_name, buf)?,
155 None => registry.udaf(udaf_name).or_else(|_| codec.try_decode_udaf(udaf_name, &[]))?,
156 })
157 }
158 protobuf::physical_window_expr_node::WindowFunction::UserDefinedWindowFunction(udwf_name) => {
159 WindowFunctionDefinition::WindowUDF(match &proto.fun_definition {
160 Some(buf) => codec.try_decode_udwf(udwf_name, buf)?,
161 None => registry.udwf(udwf_name).or_else(|_| codec.try_decode_udwf(udwf_name, &[]))?
162 })
163 }
164 }
165 } else {
166 return Err(proto_error("Missing required field in protobuf"));
167 };
168
169 let name = proto.name.clone();
170 let extended_schema =
172 schema_add_window_field(&window_node_expr, input_schema, &fun, &name)?;
173 create_window_expr(
174 &fun,
175 name,
176 &window_node_expr,
177 &partition_by,
178 order_by.as_ref(),
179 Arc::new(window_frame),
180 &extended_schema,
181 false,
182 )
183}
184
185pub fn parse_physical_exprs<'a, I>(
186 protos: I,
187 registry: &dyn FunctionRegistry,
188 input_schema: &Schema,
189 codec: &dyn PhysicalExtensionCodec,
190) -> Result<Vec<Arc<dyn PhysicalExpr>>>
191where
192 I: IntoIterator<Item = &'a protobuf::PhysicalExprNode>,
193{
194 protos
195 .into_iter()
196 .map(|p| parse_physical_expr(p, registry, input_schema, codec))
197 .collect::<Result<Vec<_>>>()
198}
199
200pub fn parse_physical_expr(
210 proto: &protobuf::PhysicalExprNode,
211 registry: &dyn FunctionRegistry,
212 input_schema: &Schema,
213 codec: &dyn PhysicalExtensionCodec,
214) -> Result<Arc<dyn PhysicalExpr>> {
215 let expr_type = proto
216 .expr_type
217 .as_ref()
218 .ok_or_else(|| proto_error("Unexpected empty physical expression"))?;
219
220 let pexpr: Arc<dyn PhysicalExpr> = match expr_type {
221 ExprType::Column(c) => {
222 let pcol: Column = c.into();
223 Arc::new(pcol)
224 }
225 ExprType::UnknownColumn(c) => Arc::new(UnKnownColumn::new(&c.name)),
226 ExprType::Literal(scalar) => Arc::new(Literal::new(scalar.try_into()?)),
227 ExprType::BinaryExpr(binary_expr) => Arc::new(BinaryExpr::new(
228 parse_required_physical_expr(
229 binary_expr.l.as_deref(),
230 registry,
231 "left",
232 input_schema,
233 codec,
234 )?,
235 logical_plan::from_proto::from_proto_binary_op(&binary_expr.op)?,
236 parse_required_physical_expr(
237 binary_expr.r.as_deref(),
238 registry,
239 "right",
240 input_schema,
241 codec,
242 )?,
243 )),
244 ExprType::AggregateExpr(_) => {
245 return not_impl_err!(
246 "Cannot convert aggregate expr node to physical expression"
247 );
248 }
249 ExprType::WindowExpr(_) => {
250 return not_impl_err!(
251 "Cannot convert window expr node to physical expression"
252 );
253 }
254 ExprType::Sort(_) => {
255 return not_impl_err!("Cannot convert sort expr node to physical expression");
256 }
257 ExprType::IsNullExpr(e) => {
258 Arc::new(IsNullExpr::new(parse_required_physical_expr(
259 e.expr.as_deref(),
260 registry,
261 "expr",
262 input_schema,
263 codec,
264 )?))
265 }
266 ExprType::IsNotNullExpr(e) => {
267 Arc::new(IsNotNullExpr::new(parse_required_physical_expr(
268 e.expr.as_deref(),
269 registry,
270 "expr",
271 input_schema,
272 codec,
273 )?))
274 }
275 ExprType::NotExpr(e) => Arc::new(NotExpr::new(parse_required_physical_expr(
276 e.expr.as_deref(),
277 registry,
278 "expr",
279 input_schema,
280 codec,
281 )?)),
282 ExprType::Negative(e) => {
283 Arc::new(NegativeExpr::new(parse_required_physical_expr(
284 e.expr.as_deref(),
285 registry,
286 "expr",
287 input_schema,
288 codec,
289 )?))
290 }
291 ExprType::InList(e) => in_list(
292 parse_required_physical_expr(
293 e.expr.as_deref(),
294 registry,
295 "expr",
296 input_schema,
297 codec,
298 )?,
299 parse_physical_exprs(&e.list, registry, input_schema, codec)?,
300 &e.negated,
301 input_schema,
302 )?,
303 ExprType::Case(e) => Arc::new(CaseExpr::try_new(
304 e.expr
305 .as_ref()
306 .map(|e| parse_physical_expr(e.as_ref(), registry, input_schema, codec))
307 .transpose()?,
308 e.when_then_expr
309 .iter()
310 .map(|e| {
311 Ok((
312 parse_required_physical_expr(
313 e.when_expr.as_ref(),
314 registry,
315 "when_expr",
316 input_schema,
317 codec,
318 )?,
319 parse_required_physical_expr(
320 e.then_expr.as_ref(),
321 registry,
322 "then_expr",
323 input_schema,
324 codec,
325 )?,
326 ))
327 })
328 .collect::<Result<Vec<_>>>()?,
329 e.else_expr
330 .as_ref()
331 .map(|e| parse_physical_expr(e.as_ref(), registry, input_schema, codec))
332 .transpose()?,
333 )?),
334 ExprType::Cast(e) => Arc::new(CastExpr::new(
335 parse_required_physical_expr(
336 e.expr.as_deref(),
337 registry,
338 "expr",
339 input_schema,
340 codec,
341 )?,
342 convert_required!(e.arrow_type)?,
343 None,
344 )),
345 ExprType::TryCast(e) => Arc::new(TryCastExpr::new(
346 parse_required_physical_expr(
347 e.expr.as_deref(),
348 registry,
349 "expr",
350 input_schema,
351 codec,
352 )?,
353 convert_required!(e.arrow_type)?,
354 )),
355 ExprType::ScalarUdf(e) => {
356 let udf = match &e.fun_definition {
357 Some(buf) => codec.try_decode_udf(&e.name, buf)?,
358 None => registry
359 .udf(e.name.as_str())
360 .or_else(|_| codec.try_decode_udf(&e.name, &[]))?,
361 };
362 let scalar_fun_def = Arc::clone(&udf);
363
364 let args = parse_physical_exprs(&e.args, registry, input_schema, codec)?;
365
366 Arc::new(
367 ScalarFunctionExpr::new(
368 e.name.as_str(),
369 scalar_fun_def,
370 args,
371 Field::new("f", convert_required!(e.return_type)?, true).into(),
372 )
373 .with_nullable(e.nullable),
374 )
375 }
376 ExprType::LikeExpr(like_expr) => Arc::new(LikeExpr::new(
377 like_expr.negated,
378 like_expr.case_insensitive,
379 parse_required_physical_expr(
380 like_expr.expr.as_deref(),
381 registry,
382 "expr",
383 input_schema,
384 codec,
385 )?,
386 parse_required_physical_expr(
387 like_expr.pattern.as_deref(),
388 registry,
389 "pattern",
390 input_schema,
391 codec,
392 )?,
393 )),
394 ExprType::Extension(extension) => {
395 let inputs: Vec<Arc<dyn PhysicalExpr>> = extension
396 .inputs
397 .iter()
398 .map(|e| parse_physical_expr(e, registry, input_schema, codec))
399 .collect::<Result<_>>()?;
400 (codec.try_decode_expr(extension.expr.as_slice(), &inputs)?) as _
401 }
402 };
403
404 Ok(pexpr)
405}
406
407fn parse_required_physical_expr(
408 expr: Option<&protobuf::PhysicalExprNode>,
409 registry: &dyn FunctionRegistry,
410 field: &str,
411 input_schema: &Schema,
412 codec: &dyn PhysicalExtensionCodec,
413) -> Result<Arc<dyn PhysicalExpr>> {
414 expr.map(|e| parse_physical_expr(e, registry, input_schema, codec))
415 .transpose()?
416 .ok_or_else(|| {
417 DataFusionError::Internal(format!("Missing required field {field:?}"))
418 })
419}
420
421pub fn parse_protobuf_hash_partitioning(
422 partitioning: Option<&protobuf::PhysicalHashRepartition>,
423 registry: &dyn FunctionRegistry,
424 input_schema: &Schema,
425 codec: &dyn PhysicalExtensionCodec,
426) -> Result<Option<Partitioning>> {
427 match partitioning {
428 Some(hash_part) => {
429 let expr = parse_physical_exprs(
430 &hash_part.hash_expr,
431 registry,
432 input_schema,
433 codec,
434 )?;
435
436 Ok(Some(Partitioning::Hash(
437 expr,
438 hash_part.partition_count.try_into().unwrap(),
439 )))
440 }
441 None => Ok(None),
442 }
443}
444
445pub fn parse_protobuf_partitioning(
446 partitioning: Option<&protobuf::Partitioning>,
447 registry: &dyn FunctionRegistry,
448 input_schema: &Schema,
449 codec: &dyn PhysicalExtensionCodec,
450) -> Result<Option<Partitioning>> {
451 match partitioning {
452 Some(protobuf::Partitioning { partition_method }) => match partition_method {
453 Some(protobuf::partitioning::PartitionMethod::RoundRobin(
454 partition_count,
455 )) => Ok(Some(Partitioning::RoundRobinBatch(
456 *partition_count as usize,
457 ))),
458 Some(protobuf::partitioning::PartitionMethod::Hash(hash_repartition)) => {
459 parse_protobuf_hash_partitioning(
460 Some(hash_repartition),
461 registry,
462 input_schema,
463 codec,
464 )
465 }
466 Some(protobuf::partitioning::PartitionMethod::Unknown(partition_count)) => {
467 Ok(Some(Partitioning::UnknownPartitioning(
468 *partition_count as usize,
469 )))
470 }
471 None => Ok(None),
472 },
473 None => Ok(None),
474 }
475}
476
477pub fn parse_protobuf_file_scan_schema(
478 proto: &protobuf::FileScanExecConf,
479) -> Result<Arc<Schema>> {
480 Ok(Arc::new(convert_required!(proto.schema)?))
481}
482
483pub fn parse_protobuf_file_scan_config(
484 proto: &protobuf::FileScanExecConf,
485 registry: &dyn FunctionRegistry,
486 codec: &dyn PhysicalExtensionCodec,
487 file_source: Arc<dyn FileSource>,
488) -> Result<FileScanConfig> {
489 let schema: Arc<Schema> = parse_protobuf_file_scan_schema(proto)?;
490 let projection = proto
491 .projection
492 .iter()
493 .map(|i| *i as usize)
494 .collect::<Vec<_>>();
495
496 let constraints = convert_required!(proto.constraints)?;
497 let statistics = convert_required!(proto.statistics)?;
498
499 let file_groups = proto
500 .file_groups
501 .iter()
502 .map(|f| f.try_into())
503 .collect::<Result<Vec<_>, _>>()?;
504
505 let object_store_url = match proto.object_store_url.is_empty() {
506 false => ObjectStoreUrl::parse(&proto.object_store_url)?,
507 true => ObjectStoreUrl::local_filesystem(),
508 };
509
510 let table_partition_cols = proto
512 .table_partition_cols
513 .iter()
514 .map(|col| Ok(schema.field_with_name(col)?.clone()))
515 .collect::<Result<Vec<_>>>()?;
516
517 let file_schema = Arc::new(Schema::new(
521 schema
522 .fields()
523 .iter()
524 .filter(|field| !table_partition_cols.contains(field))
525 .cloned()
526 .collect::<Vec<_>>(),
527 ));
528
529 let mut output_ordering = vec![];
530 for node_collection in &proto.output_ordering {
531 let sort_expr = parse_physical_sort_exprs(
532 &node_collection.physical_sort_expr_nodes,
533 registry,
534 &schema,
535 codec,
536 )?;
537 output_ordering.push(sort_expr);
538 }
539
540 let config = FileScanConfigBuilder::new(object_store_url, file_schema, file_source)
541 .with_file_groups(file_groups)
542 .with_constraints(constraints)
543 .with_statistics(statistics)
544 .with_projection(Some(projection))
545 .with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize))
546 .with_table_partition_cols(table_partition_cols)
547 .with_output_ordering(output_ordering)
548 .with_batch_size(proto.batch_size.map(|s| s as usize))
549 .build();
550 Ok(config)
551}
552
553impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile {
554 type Error = DataFusionError;
555
556 fn try_from(val: &protobuf::PartitionedFile) -> Result<Self, Self::Error> {
557 Ok(PartitionedFile {
558 object_meta: ObjectMeta {
559 location: Path::from(val.path.as_str()),
560 last_modified: Utc.timestamp_nanos(val.last_modified_ns as i64),
561 size: val.size,
562 e_tag: None,
563 version: None,
564 },
565 partition_values: val
566 .partition_values
567 .iter()
568 .map(|v| v.try_into())
569 .collect::<Result<Vec<_>, _>>()?,
570 range: val.range.as_ref().map(|v| v.try_into()).transpose()?,
571 statistics: val
572 .statistics
573 .as_ref()
574 .map(|v| v.try_into().map(Arc::new))
575 .transpose()?,
576 extensions: None,
577 metadata_size_hint: None,
578 })
579 }
580}
581
582impl TryFrom<&protobuf::FileRange> for FileRange {
583 type Error = DataFusionError;
584
585 fn try_from(value: &protobuf::FileRange) -> Result<Self, Self::Error> {
586 Ok(FileRange {
587 start: value.start,
588 end: value.end,
589 })
590 }
591}
592
593impl TryFrom<&protobuf::FileGroup> for FileGroup {
594 type Error = DataFusionError;
595
596 fn try_from(val: &protobuf::FileGroup) -> Result<Self, Self::Error> {
597 let files = val
598 .files
599 .iter()
600 .map(|f| f.try_into())
601 .collect::<Result<Vec<_>, _>>()?;
602 Ok(FileGroup::new(files))
603 }
604}
605
606impl TryFrom<&protobuf::JsonSink> for JsonSink {
607 type Error = DataFusionError;
608
609 fn try_from(value: &protobuf::JsonSink) -> Result<Self, Self::Error> {
610 Ok(Self::new(
611 convert_required!(value.config)?,
612 convert_required!(value.writer_options)?,
613 ))
614 }
615}
616
617#[cfg(feature = "parquet")]
618impl TryFrom<&protobuf::ParquetSink> for ParquetSink {
619 type Error = DataFusionError;
620
621 fn try_from(value: &protobuf::ParquetSink) -> Result<Self, Self::Error> {
622 Ok(Self::new(
623 convert_required!(value.config)?,
624 convert_required!(value.parquet_options)?,
625 ))
626 }
627}
628
629impl TryFrom<&protobuf::CsvSink> for CsvSink {
630 type Error = DataFusionError;
631
632 fn try_from(value: &protobuf::CsvSink) -> Result<Self, Self::Error> {
633 Ok(Self::new(
634 convert_required!(value.config)?,
635 convert_required!(value.writer_options)?,
636 ))
637 }
638}
639
640impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig {
641 type Error = DataFusionError;
642
643 fn try_from(conf: &protobuf::FileSinkConfig) -> Result<Self, Self::Error> {
644 let file_group = FileGroup::new(
645 conf.file_groups
646 .iter()
647 .map(|f| f.try_into())
648 .collect::<Result<Vec<_>>>()?,
649 );
650 let table_paths = conf
651 .table_paths
652 .iter()
653 .map(ListingTableUrl::parse)
654 .collect::<Result<Vec<_>>>()?;
655 let table_partition_cols = conf
656 .table_partition_cols
657 .iter()
658 .map(|protobuf::PartitionColumn { name, arrow_type }| {
659 let data_type = convert_required!(arrow_type)?;
660 Ok((name.clone(), data_type))
661 })
662 .collect::<Result<Vec<_>>>()?;
663 let insert_op = match conf.insert_op() {
664 protobuf::InsertOp::Append => InsertOp::Append,
665 protobuf::InsertOp::Overwrite => InsertOp::Overwrite,
666 protobuf::InsertOp::Replace => InsertOp::Replace,
667 };
668 Ok(Self {
669 original_url: String::default(),
670 object_store_url: ObjectStoreUrl::parse(&conf.object_store_url)?,
671 file_group,
672 table_paths,
673 output_schema: Arc::new(convert_required!(conf.output_schema)?),
674 table_partition_cols,
675 insert_op,
676 keep_partition_by_columns: conf.keep_partition_by_columns,
677 file_extension: conf.file_extension.clone(),
678 })
679 }
680}