1use std::sync::Arc;
21
22use arrow::compute::SortOptions;
23use chrono::{TimeZone, Utc};
24use datafusion_expr::dml::InsertOp;
25use object_store::path::Path;
26use object_store::ObjectMeta;
27
28use datafusion::arrow::datatypes::Schema;
29use datafusion::datasource::file_format::csv::CsvSink;
30use datafusion::datasource::file_format::json::JsonSink;
31#[cfg(feature = "parquet")]
32use datafusion::datasource::file_format::parquet::ParquetSink;
33use datafusion::datasource::listing::{FileRange, ListingTableUrl, PartitionedFile};
34use datafusion::datasource::object_store::ObjectStoreUrl;
35use datafusion::datasource::physical_plan::{
36 FileGroup, FileScanConfig, FileScanConfigBuilder, FileSinkConfig, FileSource,
37};
38use datafusion::execution::FunctionRegistry;
39use datafusion::logical_expr::WindowFunctionDefinition;
40use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr};
41use datafusion::physical_plan::expressions::{
42 in_list, BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr, LikeExpr,
43 Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
44};
45use datafusion::physical_plan::windows::{create_window_expr, schema_add_window_field};
46use datafusion::physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
47use datafusion_common::{not_impl_err, DataFusionError, Result};
48use datafusion_proto_common::common::proto_error;
49
50use crate::convert_required;
51use crate::logical_plan::{self};
52use crate::protobuf;
53use crate::protobuf::physical_expr_node::ExprType;
54
55use super::PhysicalExtensionCodec;
56
57impl From<&protobuf::PhysicalColumn> for Column {
58 fn from(c: &protobuf::PhysicalColumn) -> Column {
59 Column::new(&c.name, c.index as usize)
60 }
61}
62
63pub fn parse_physical_sort_expr(
73 proto: &protobuf::PhysicalSortExprNode,
74 registry: &dyn FunctionRegistry,
75 input_schema: &Schema,
76 codec: &dyn PhysicalExtensionCodec,
77) -> Result<PhysicalSortExpr> {
78 if let Some(expr) = &proto.expr {
79 let expr = parse_physical_expr(expr.as_ref(), registry, input_schema, codec)?;
80 let options = SortOptions {
81 descending: !proto.asc,
82 nulls_first: proto.nulls_first,
83 };
84 Ok(PhysicalSortExpr { expr, options })
85 } else {
86 Err(proto_error("Unexpected empty physical expression"))
87 }
88}
89
90pub fn parse_physical_sort_exprs(
100 proto: &[protobuf::PhysicalSortExprNode],
101 registry: &dyn FunctionRegistry,
102 input_schema: &Schema,
103 codec: &dyn PhysicalExtensionCodec,
104) -> Result<LexOrdering> {
105 proto
106 .iter()
107 .map(|sort_expr| {
108 parse_physical_sort_expr(sort_expr, registry, input_schema, codec)
109 })
110 .collect::<Result<LexOrdering>>()
111}
112
113pub fn parse_physical_window_expr(
124 proto: &protobuf::PhysicalWindowExprNode,
125 registry: &dyn FunctionRegistry,
126 input_schema: &Schema,
127 codec: &dyn PhysicalExtensionCodec,
128) -> Result<Arc<dyn WindowExpr>> {
129 let window_node_expr =
130 parse_physical_exprs(&proto.args, registry, input_schema, codec)?;
131 let partition_by =
132 parse_physical_exprs(&proto.partition_by, registry, input_schema, codec)?;
133
134 let order_by =
135 parse_physical_sort_exprs(&proto.order_by, registry, input_schema, codec)?;
136
137 let window_frame = proto
138 .window_frame
139 .as_ref()
140 .map(|wf| wf.clone().try_into())
141 .transpose()
142 .map_err(|e| DataFusionError::Internal(format!("{e}")))?
143 .ok_or_else(|| {
144 DataFusionError::Internal(
145 "Missing required field 'window_frame' in protobuf".to_string(),
146 )
147 })?;
148
149 let fun = if let Some(window_func) = proto.window_function.as_ref() {
150 match window_func {
151 protobuf::physical_window_expr_node::WindowFunction::UserDefinedAggrFunction(udaf_name) => {
152 WindowFunctionDefinition::AggregateUDF(match &proto.fun_definition {
153 Some(buf) => codec.try_decode_udaf(udaf_name, buf)?,
154 None => registry.udaf(udaf_name)?
155 })
156 }
157 protobuf::physical_window_expr_node::WindowFunction::UserDefinedWindowFunction(udwf_name) => {
158 WindowFunctionDefinition::WindowUDF(match &proto.fun_definition {
159 Some(buf) => codec.try_decode_udwf(udwf_name, buf)?,
160 None => registry.udwf(udwf_name)?
161 })
162 }
163 }
164 } else {
165 return Err(proto_error("Missing required field in protobuf"));
166 };
167
168 let name = proto.name.clone();
169 let extended_schema =
171 schema_add_window_field(&window_node_expr, input_schema, &fun, &name)?;
172 create_window_expr(
173 &fun,
174 name,
175 &window_node_expr,
176 &partition_by,
177 order_by.as_ref(),
178 Arc::new(window_frame),
179 &extended_schema,
180 false,
181 )
182}
183
184pub fn parse_physical_exprs<'a, I>(
185 protos: I,
186 registry: &dyn FunctionRegistry,
187 input_schema: &Schema,
188 codec: &dyn PhysicalExtensionCodec,
189) -> Result<Vec<Arc<dyn PhysicalExpr>>>
190where
191 I: IntoIterator<Item = &'a protobuf::PhysicalExprNode>,
192{
193 protos
194 .into_iter()
195 .map(|p| parse_physical_expr(p, registry, input_schema, codec))
196 .collect::<Result<Vec<_>>>()
197}
198
199pub fn parse_physical_expr(
209 proto: &protobuf::PhysicalExprNode,
210 registry: &dyn FunctionRegistry,
211 input_schema: &Schema,
212 codec: &dyn PhysicalExtensionCodec,
213) -> Result<Arc<dyn PhysicalExpr>> {
214 let expr_type = proto
215 .expr_type
216 .as_ref()
217 .ok_or_else(|| proto_error("Unexpected empty physical expression"))?;
218
219 let pexpr: Arc<dyn PhysicalExpr> = match expr_type {
220 ExprType::Column(c) => {
221 let pcol: Column = c.into();
222 Arc::new(pcol)
223 }
224 ExprType::UnknownColumn(c) => Arc::new(UnKnownColumn::new(&c.name)),
225 ExprType::Literal(scalar) => Arc::new(Literal::new(scalar.try_into()?)),
226 ExprType::BinaryExpr(binary_expr) => Arc::new(BinaryExpr::new(
227 parse_required_physical_expr(
228 binary_expr.l.as_deref(),
229 registry,
230 "left",
231 input_schema,
232 codec,
233 )?,
234 logical_plan::from_proto::from_proto_binary_op(&binary_expr.op)?,
235 parse_required_physical_expr(
236 binary_expr.r.as_deref(),
237 registry,
238 "right",
239 input_schema,
240 codec,
241 )?,
242 )),
243 ExprType::AggregateExpr(_) => {
244 return not_impl_err!(
245 "Cannot convert aggregate expr node to physical expression"
246 );
247 }
248 ExprType::WindowExpr(_) => {
249 return not_impl_err!(
250 "Cannot convert window expr node to physical expression"
251 );
252 }
253 ExprType::Sort(_) => {
254 return not_impl_err!("Cannot convert sort expr node to physical expression");
255 }
256 ExprType::IsNullExpr(e) => {
257 Arc::new(IsNullExpr::new(parse_required_physical_expr(
258 e.expr.as_deref(),
259 registry,
260 "expr",
261 input_schema,
262 codec,
263 )?))
264 }
265 ExprType::IsNotNullExpr(e) => {
266 Arc::new(IsNotNullExpr::new(parse_required_physical_expr(
267 e.expr.as_deref(),
268 registry,
269 "expr",
270 input_schema,
271 codec,
272 )?))
273 }
274 ExprType::NotExpr(e) => Arc::new(NotExpr::new(parse_required_physical_expr(
275 e.expr.as_deref(),
276 registry,
277 "expr",
278 input_schema,
279 codec,
280 )?)),
281 ExprType::Negative(e) => {
282 Arc::new(NegativeExpr::new(parse_required_physical_expr(
283 e.expr.as_deref(),
284 registry,
285 "expr",
286 input_schema,
287 codec,
288 )?))
289 }
290 ExprType::InList(e) => in_list(
291 parse_required_physical_expr(
292 e.expr.as_deref(),
293 registry,
294 "expr",
295 input_schema,
296 codec,
297 )?,
298 parse_physical_exprs(&e.list, registry, input_schema, codec)?,
299 &e.negated,
300 input_schema,
301 )?,
302 ExprType::Case(e) => Arc::new(CaseExpr::try_new(
303 e.expr
304 .as_ref()
305 .map(|e| parse_physical_expr(e.as_ref(), registry, input_schema, codec))
306 .transpose()?,
307 e.when_then_expr
308 .iter()
309 .map(|e| {
310 Ok((
311 parse_required_physical_expr(
312 e.when_expr.as_ref(),
313 registry,
314 "when_expr",
315 input_schema,
316 codec,
317 )?,
318 parse_required_physical_expr(
319 e.then_expr.as_ref(),
320 registry,
321 "then_expr",
322 input_schema,
323 codec,
324 )?,
325 ))
326 })
327 .collect::<Result<Vec<_>>>()?,
328 e.else_expr
329 .as_ref()
330 .map(|e| parse_physical_expr(e.as_ref(), registry, input_schema, codec))
331 .transpose()?,
332 )?),
333 ExprType::Cast(e) => Arc::new(CastExpr::new(
334 parse_required_physical_expr(
335 e.expr.as_deref(),
336 registry,
337 "expr",
338 input_schema,
339 codec,
340 )?,
341 convert_required!(e.arrow_type)?,
342 None,
343 )),
344 ExprType::TryCast(e) => Arc::new(TryCastExpr::new(
345 parse_required_physical_expr(
346 e.expr.as_deref(),
347 registry,
348 "expr",
349 input_schema,
350 codec,
351 )?,
352 convert_required!(e.arrow_type)?,
353 )),
354 ExprType::ScalarUdf(e) => {
355 let udf = match &e.fun_definition {
356 Some(buf) => codec.try_decode_udf(&e.name, buf)?,
357 None => registry.udf(e.name.as_str())?,
358 };
359 let scalar_fun_def = Arc::clone(&udf);
360
361 let args = parse_physical_exprs(&e.args, registry, input_schema, codec)?;
362
363 Arc::new(
364 ScalarFunctionExpr::new(
365 e.name.as_str(),
366 scalar_fun_def,
367 args,
368 convert_required!(e.return_type)?,
369 )
370 .with_nullable(e.nullable),
371 )
372 }
373 ExprType::LikeExpr(like_expr) => Arc::new(LikeExpr::new(
374 like_expr.negated,
375 like_expr.case_insensitive,
376 parse_required_physical_expr(
377 like_expr.expr.as_deref(),
378 registry,
379 "expr",
380 input_schema,
381 codec,
382 )?,
383 parse_required_physical_expr(
384 like_expr.pattern.as_deref(),
385 registry,
386 "pattern",
387 input_schema,
388 codec,
389 )?,
390 )),
391 ExprType::Extension(extension) => {
392 let inputs: Vec<Arc<dyn PhysicalExpr>> = extension
393 .inputs
394 .iter()
395 .map(|e| parse_physical_expr(e, registry, input_schema, codec))
396 .collect::<Result<_>>()?;
397 (codec.try_decode_expr(extension.expr.as_slice(), &inputs)?) as _
398 }
399 };
400
401 Ok(pexpr)
402}
403
404fn parse_required_physical_expr(
405 expr: Option<&protobuf::PhysicalExprNode>,
406 registry: &dyn FunctionRegistry,
407 field: &str,
408 input_schema: &Schema,
409 codec: &dyn PhysicalExtensionCodec,
410) -> Result<Arc<dyn PhysicalExpr>> {
411 expr.map(|e| parse_physical_expr(e, registry, input_schema, codec))
412 .transpose()?
413 .ok_or_else(|| {
414 DataFusionError::Internal(format!("Missing required field {field:?}"))
415 })
416}
417
418pub fn parse_protobuf_hash_partitioning(
419 partitioning: Option<&protobuf::PhysicalHashRepartition>,
420 registry: &dyn FunctionRegistry,
421 input_schema: &Schema,
422 codec: &dyn PhysicalExtensionCodec,
423) -> Result<Option<Partitioning>> {
424 match partitioning {
425 Some(hash_part) => {
426 let expr = parse_physical_exprs(
427 &hash_part.hash_expr,
428 registry,
429 input_schema,
430 codec,
431 )?;
432
433 Ok(Some(Partitioning::Hash(
434 expr,
435 hash_part.partition_count.try_into().unwrap(),
436 )))
437 }
438 None => Ok(None),
439 }
440}
441
442pub fn parse_protobuf_partitioning(
443 partitioning: Option<&protobuf::Partitioning>,
444 registry: &dyn FunctionRegistry,
445 input_schema: &Schema,
446 codec: &dyn PhysicalExtensionCodec,
447) -> Result<Option<Partitioning>> {
448 match partitioning {
449 Some(protobuf::Partitioning { partition_method }) => match partition_method {
450 Some(protobuf::partitioning::PartitionMethod::RoundRobin(
451 partition_count,
452 )) => Ok(Some(Partitioning::RoundRobinBatch(
453 *partition_count as usize,
454 ))),
455 Some(protobuf::partitioning::PartitionMethod::Hash(hash_repartition)) => {
456 parse_protobuf_hash_partitioning(
457 Some(hash_repartition),
458 registry,
459 input_schema,
460 codec,
461 )
462 }
463 Some(protobuf::partitioning::PartitionMethod::Unknown(partition_count)) => {
464 Ok(Some(Partitioning::UnknownPartitioning(
465 *partition_count as usize,
466 )))
467 }
468 None => Ok(None),
469 },
470 None => Ok(None),
471 }
472}
473
474pub fn parse_protobuf_file_scan_schema(
475 proto: &protobuf::FileScanExecConf,
476) -> Result<Arc<Schema>> {
477 Ok(Arc::new(convert_required!(proto.schema)?))
478}
479
480pub fn parse_protobuf_file_scan_config(
481 proto: &protobuf::FileScanExecConf,
482 registry: &dyn FunctionRegistry,
483 codec: &dyn PhysicalExtensionCodec,
484 file_source: Arc<dyn FileSource>,
485) -> Result<FileScanConfig> {
486 let schema: Arc<Schema> = parse_protobuf_file_scan_schema(proto)?;
487 let projection = proto
488 .projection
489 .iter()
490 .map(|i| *i as usize)
491 .collect::<Vec<_>>();
492
493 let constraints = convert_required!(proto.constraints)?;
494 let statistics = convert_required!(proto.statistics)?;
495
496 let file_groups = proto
497 .file_groups
498 .iter()
499 .map(|f| f.try_into())
500 .collect::<Result<Vec<_>, _>>()?;
501
502 let object_store_url = match proto.object_store_url.is_empty() {
503 false => ObjectStoreUrl::parse(&proto.object_store_url)?,
504 true => ObjectStoreUrl::local_filesystem(),
505 };
506
507 let table_partition_cols = proto
509 .table_partition_cols
510 .iter()
511 .map(|col| Ok(schema.field_with_name(col)?.clone()))
512 .collect::<Result<Vec<_>>>()?;
513
514 let file_schema = Arc::new(Schema::new(
518 schema
519 .fields()
520 .iter()
521 .filter(|field| !table_partition_cols.contains(field))
522 .cloned()
523 .collect::<Vec<_>>(),
524 ));
525
526 let mut output_ordering = vec![];
527 for node_collection in &proto.output_ordering {
528 let sort_expr = parse_physical_sort_exprs(
529 &node_collection.physical_sort_expr_nodes,
530 registry,
531 &schema,
532 codec,
533 )?;
534 output_ordering.push(sort_expr);
535 }
536
537 let config = FileScanConfigBuilder::new(object_store_url, file_schema, file_source)
538 .with_file_groups(file_groups)
539 .with_constraints(constraints)
540 .with_statistics(statistics)
541 .with_projection(Some(projection))
542 .with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize))
543 .with_table_partition_cols(table_partition_cols)
544 .with_output_ordering(output_ordering)
545 .with_batch_size(proto.batch_size.map(|s| s as usize))
546 .build();
547 Ok(config)
548}
549
550impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile {
551 type Error = DataFusionError;
552
553 fn try_from(val: &protobuf::PartitionedFile) -> Result<Self, Self::Error> {
554 Ok(PartitionedFile {
555 object_meta: ObjectMeta {
556 location: Path::from(val.path.as_str()),
557 last_modified: Utc.timestamp_nanos(val.last_modified_ns as i64),
558 size: val.size,
559 e_tag: None,
560 version: None,
561 },
562 partition_values: val
563 .partition_values
564 .iter()
565 .map(|v| v.try_into())
566 .collect::<Result<Vec<_>, _>>()?,
567 range: val.range.as_ref().map(|v| v.try_into()).transpose()?,
568 statistics: val
569 .statistics
570 .as_ref()
571 .map(|v| v.try_into().map(Arc::new))
572 .transpose()?,
573 extensions: None,
574 metadata_size_hint: None,
575 })
576 }
577}
578
579impl TryFrom<&protobuf::FileRange> for FileRange {
580 type Error = DataFusionError;
581
582 fn try_from(value: &protobuf::FileRange) -> Result<Self, Self::Error> {
583 Ok(FileRange {
584 start: value.start,
585 end: value.end,
586 })
587 }
588}
589
590impl TryFrom<&protobuf::FileGroup> for FileGroup {
591 type Error = DataFusionError;
592
593 fn try_from(val: &protobuf::FileGroup) -> Result<Self, Self::Error> {
594 let files = val
595 .files
596 .iter()
597 .map(|f| f.try_into())
598 .collect::<Result<Vec<_>, _>>()?;
599 Ok(FileGroup::new(files))
600 }
601}
602
603impl TryFrom<&protobuf::JsonSink> for JsonSink {
604 type Error = DataFusionError;
605
606 fn try_from(value: &protobuf::JsonSink) -> Result<Self, Self::Error> {
607 Ok(Self::new(
608 convert_required!(value.config)?,
609 convert_required!(value.writer_options)?,
610 ))
611 }
612}
613
614#[cfg(feature = "parquet")]
615impl TryFrom<&protobuf::ParquetSink> for ParquetSink {
616 type Error = DataFusionError;
617
618 fn try_from(value: &protobuf::ParquetSink) -> Result<Self, Self::Error> {
619 Ok(Self::new(
620 convert_required!(value.config)?,
621 convert_required!(value.parquet_options)?,
622 ))
623 }
624}
625
626impl TryFrom<&protobuf::CsvSink> for CsvSink {
627 type Error = DataFusionError;
628
629 fn try_from(value: &protobuf::CsvSink) -> Result<Self, Self::Error> {
630 Ok(Self::new(
631 convert_required!(value.config)?,
632 convert_required!(value.writer_options)?,
633 ))
634 }
635}
636
637impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig {
638 type Error = DataFusionError;
639
640 fn try_from(conf: &protobuf::FileSinkConfig) -> Result<Self, Self::Error> {
641 let file_group = FileGroup::new(
642 conf.file_groups
643 .iter()
644 .map(|f| f.try_into())
645 .collect::<Result<Vec<_>>>()?,
646 );
647 let table_paths = conf
648 .table_paths
649 .iter()
650 .map(ListingTableUrl::parse)
651 .collect::<Result<Vec<_>>>()?;
652 let table_partition_cols = conf
653 .table_partition_cols
654 .iter()
655 .map(|protobuf::PartitionColumn { name, arrow_type }| {
656 let data_type = convert_required!(arrow_type)?;
657 Ok((name.clone(), data_type))
658 })
659 .collect::<Result<Vec<_>>>()?;
660 let insert_op = match conf.insert_op() {
661 protobuf::InsertOp::Append => InsertOp::Append,
662 protobuf::InsertOp::Overwrite => InsertOp::Overwrite,
663 protobuf::InsertOp::Replace => InsertOp::Replace,
664 };
665 Ok(Self {
666 original_url: String::default(),
667 object_store_url: ObjectStoreUrl::parse(&conf.object_store_url)?,
668 file_group,
669 table_paths,
670 output_schema: Arc::new(convert_required!(conf.output_schema)?),
671 table_partition_cols,
672 insert_op,
673 keep_partition_by_columns: conf.keep_partition_by_columns,
674 file_extension: conf.file_extension.clone(),
675 })
676 }
677}