1use std::sync::Arc;
21
22use arrow::array::RecordBatch;
23use arrow::compute::SortOptions;
24use arrow::datatypes::Field;
25use arrow::ipc::reader::StreamReader;
26use chrono::{TimeZone, Utc};
27use datafusion_expr::dml::InsertOp;
28use object_store::path::Path;
29use object_store::ObjectMeta;
30
31use datafusion::arrow::datatypes::Schema;
32use datafusion::datasource::file_format::csv::CsvSink;
33use datafusion::datasource::file_format::json::JsonSink;
34#[cfg(feature = "parquet")]
35use datafusion::datasource::file_format::parquet::ParquetSink;
36use datafusion::datasource::listing::{FileRange, ListingTableUrl, PartitionedFile};
37use datafusion::datasource::object_store::ObjectStoreUrl;
38use datafusion::datasource::physical_plan::{
39 FileGroup, FileScanConfig, FileScanConfigBuilder, FileSinkConfig, FileSource,
40};
41use datafusion::execution::FunctionRegistry;
42use datafusion::logical_expr::WindowFunctionDefinition;
43use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr};
44use datafusion::physical_plan::expressions::{
45 in_list, BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr, LikeExpr,
46 Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
47};
48use datafusion::physical_plan::windows::{create_window_expr, schema_add_window_field};
49use datafusion::physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
50use datafusion::prelude::SessionContext;
51use datafusion_common::config::ConfigOptions;
52use datafusion_common::{not_impl_err, DataFusionError, Result};
53use datafusion_proto_common::common::proto_error;
54
55use crate::convert_required;
56use crate::logical_plan::{self};
57use crate::protobuf;
58use crate::protobuf::physical_expr_node::ExprType;
59
60use super::PhysicalExtensionCodec;
61
62impl From<&protobuf::PhysicalColumn> for Column {
63 fn from(c: &protobuf::PhysicalColumn) -> Column {
64 Column::new(&c.name, c.index as usize)
65 }
66}
67
68pub fn parse_physical_sort_expr(
78 proto: &protobuf::PhysicalSortExprNode,
79 ctx: &SessionContext,
80 input_schema: &Schema,
81 codec: &dyn PhysicalExtensionCodec,
82) -> Result<PhysicalSortExpr> {
83 if let Some(expr) = &proto.expr {
84 let expr = parse_physical_expr(expr.as_ref(), ctx, input_schema, codec)?;
85 let options = SortOptions {
86 descending: !proto.asc,
87 nulls_first: proto.nulls_first,
88 };
89 Ok(PhysicalSortExpr { expr, options })
90 } else {
91 Err(proto_error("Unexpected empty physical expression"))
92 }
93}
94
95pub fn parse_physical_sort_exprs(
105 proto: &[protobuf::PhysicalSortExprNode],
106 ctx: &SessionContext,
107 input_schema: &Schema,
108 codec: &dyn PhysicalExtensionCodec,
109) -> Result<Vec<PhysicalSortExpr>> {
110 proto
111 .iter()
112 .map(|sort_expr| parse_physical_sort_expr(sort_expr, ctx, input_schema, codec))
113 .collect()
114}
115
116pub fn parse_physical_window_expr(
127 proto: &protobuf::PhysicalWindowExprNode,
128 ctx: &SessionContext,
129 input_schema: &Schema,
130 codec: &dyn PhysicalExtensionCodec,
131) -> Result<Arc<dyn WindowExpr>> {
132 let window_node_expr = parse_physical_exprs(&proto.args, ctx, input_schema, codec)?;
133 let partition_by =
134 parse_physical_exprs(&proto.partition_by, ctx, input_schema, codec)?;
135
136 let order_by = parse_physical_sort_exprs(&proto.order_by, ctx, input_schema, codec)?;
137
138 let window_frame = proto
139 .window_frame
140 .as_ref()
141 .map(|wf| wf.clone().try_into())
142 .transpose()
143 .map_err(|e| DataFusionError::Internal(format!("{e}")))?
144 .ok_or_else(|| {
145 DataFusionError::Internal(
146 "Missing required field 'window_frame' in protobuf".to_string(),
147 )
148 })?;
149
150 let fun = if let Some(window_func) = proto.window_function.as_ref() {
151 match window_func {
152 protobuf::physical_window_expr_node::WindowFunction::UserDefinedAggrFunction(udaf_name) => {
153 WindowFunctionDefinition::AggregateUDF(match &proto.fun_definition {
154 Some(buf) => codec.try_decode_udaf(udaf_name, buf)?,
155 None => ctx.udaf(udaf_name).or_else(|_| codec.try_decode_udaf(udaf_name, &[]))?,
156 })
157 }
158 protobuf::physical_window_expr_node::WindowFunction::UserDefinedWindowFunction(udwf_name) => {
159 WindowFunctionDefinition::WindowUDF(match &proto.fun_definition {
160 Some(buf) => codec.try_decode_udwf(udwf_name, buf)?,
161 None => ctx.udwf(udwf_name).or_else(|_| codec.try_decode_udwf(udwf_name, &[]))?
162 })
163 }
164 }
165 } else {
166 return Err(proto_error("Missing required field in protobuf"));
167 };
168
169 let name = proto.name.clone();
170 let extended_schema =
172 schema_add_window_field(&window_node_expr, input_schema, &fun, &name)?;
173 create_window_expr(
174 &fun,
175 name,
176 &window_node_expr,
177 &partition_by,
178 &order_by,
179 Arc::new(window_frame),
180 &extended_schema,
181 proto.ignore_nulls,
182 proto.distinct,
183 None,
184 )
185}
186
187pub fn parse_physical_exprs<'a, I>(
188 protos: I,
189 ctx: &SessionContext,
190 input_schema: &Schema,
191 codec: &dyn PhysicalExtensionCodec,
192) -> Result<Vec<Arc<dyn PhysicalExpr>>>
193where
194 I: IntoIterator<Item = &'a protobuf::PhysicalExprNode>,
195{
196 protos
197 .into_iter()
198 .map(|p| parse_physical_expr(p, ctx, input_schema, codec))
199 .collect::<Result<Vec<_>>>()
200}
201
202pub fn parse_physical_expr(
212 proto: &protobuf::PhysicalExprNode,
213 ctx: &SessionContext,
214 input_schema: &Schema,
215 codec: &dyn PhysicalExtensionCodec,
216) -> Result<Arc<dyn PhysicalExpr>> {
217 let expr_type = proto
218 .expr_type
219 .as_ref()
220 .ok_or_else(|| proto_error("Unexpected empty physical expression"))?;
221
222 let pexpr: Arc<dyn PhysicalExpr> = match expr_type {
223 ExprType::Column(c) => {
224 let pcol: Column = c.into();
225 Arc::new(pcol)
226 }
227 ExprType::UnknownColumn(c) => Arc::new(UnKnownColumn::new(&c.name)),
228 ExprType::Literal(scalar) => Arc::new(Literal::new(scalar.try_into()?)),
229 ExprType::BinaryExpr(binary_expr) => Arc::new(BinaryExpr::new(
230 parse_required_physical_expr(
231 binary_expr.l.as_deref(),
232 ctx,
233 "left",
234 input_schema,
235 codec,
236 )?,
237 logical_plan::from_proto::from_proto_binary_op(&binary_expr.op)?,
238 parse_required_physical_expr(
239 binary_expr.r.as_deref(),
240 ctx,
241 "right",
242 input_schema,
243 codec,
244 )?,
245 )),
246 ExprType::AggregateExpr(_) => {
247 return not_impl_err!(
248 "Cannot convert aggregate expr node to physical expression"
249 );
250 }
251 ExprType::WindowExpr(_) => {
252 return not_impl_err!(
253 "Cannot convert window expr node to physical expression"
254 );
255 }
256 ExprType::Sort(_) => {
257 return not_impl_err!("Cannot convert sort expr node to physical expression");
258 }
259 ExprType::IsNullExpr(e) => {
260 Arc::new(IsNullExpr::new(parse_required_physical_expr(
261 e.expr.as_deref(),
262 ctx,
263 "expr",
264 input_schema,
265 codec,
266 )?))
267 }
268 ExprType::IsNotNullExpr(e) => {
269 Arc::new(IsNotNullExpr::new(parse_required_physical_expr(
270 e.expr.as_deref(),
271 ctx,
272 "expr",
273 input_schema,
274 codec,
275 )?))
276 }
277 ExprType::NotExpr(e) => Arc::new(NotExpr::new(parse_required_physical_expr(
278 e.expr.as_deref(),
279 ctx,
280 "expr",
281 input_schema,
282 codec,
283 )?)),
284 ExprType::Negative(e) => {
285 Arc::new(NegativeExpr::new(parse_required_physical_expr(
286 e.expr.as_deref(),
287 ctx,
288 "expr",
289 input_schema,
290 codec,
291 )?))
292 }
293 ExprType::InList(e) => in_list(
294 parse_required_physical_expr(
295 e.expr.as_deref(),
296 ctx,
297 "expr",
298 input_schema,
299 codec,
300 )?,
301 parse_physical_exprs(&e.list, ctx, input_schema, codec)?,
302 &e.negated,
303 input_schema,
304 )?,
305 ExprType::Case(e) => Arc::new(CaseExpr::try_new(
306 e.expr
307 .as_ref()
308 .map(|e| parse_physical_expr(e.as_ref(), ctx, input_schema, codec))
309 .transpose()?,
310 e.when_then_expr
311 .iter()
312 .map(|e| {
313 Ok((
314 parse_required_physical_expr(
315 e.when_expr.as_ref(),
316 ctx,
317 "when_expr",
318 input_schema,
319 codec,
320 )?,
321 parse_required_physical_expr(
322 e.then_expr.as_ref(),
323 ctx,
324 "then_expr",
325 input_schema,
326 codec,
327 )?,
328 ))
329 })
330 .collect::<Result<Vec<_>>>()?,
331 e.else_expr
332 .as_ref()
333 .map(|e| parse_physical_expr(e.as_ref(), ctx, input_schema, codec))
334 .transpose()?,
335 )?),
336 ExprType::Cast(e) => Arc::new(CastExpr::new(
337 parse_required_physical_expr(
338 e.expr.as_deref(),
339 ctx,
340 "expr",
341 input_schema,
342 codec,
343 )?,
344 convert_required!(e.arrow_type)?,
345 None,
346 )),
347 ExprType::TryCast(e) => Arc::new(TryCastExpr::new(
348 parse_required_physical_expr(
349 e.expr.as_deref(),
350 ctx,
351 "expr",
352 input_schema,
353 codec,
354 )?,
355 convert_required!(e.arrow_type)?,
356 )),
357 ExprType::ScalarUdf(e) => {
358 let udf = match &e.fun_definition {
359 Some(buf) => codec.try_decode_udf(&e.name, buf)?,
360 None => ctx
361 .udf(e.name.as_str())
362 .or_else(|_| codec.try_decode_udf(&e.name, &[]))?,
363 };
364 let scalar_fun_def = Arc::clone(&udf);
365
366 let args = parse_physical_exprs(&e.args, ctx, input_schema, codec)?;
367 let config_options =
368 match ctx.state().execution_props().config_options.as_ref() {
369 Some(config_options) => Arc::clone(config_options),
370 None => Arc::new(ConfigOptions::default()),
371 };
372
373 Arc::new(
374 ScalarFunctionExpr::new(
375 e.name.as_str(),
376 scalar_fun_def,
377 args,
378 Field::new(
379 &e.return_field_name,
380 convert_required!(e.return_type)?,
381 true,
382 )
383 .into(),
384 config_options,
385 )
386 .with_nullable(e.nullable),
387 )
388 }
389 ExprType::LikeExpr(like_expr) => Arc::new(LikeExpr::new(
390 like_expr.negated,
391 like_expr.case_insensitive,
392 parse_required_physical_expr(
393 like_expr.expr.as_deref(),
394 ctx,
395 "expr",
396 input_schema,
397 codec,
398 )?,
399 parse_required_physical_expr(
400 like_expr.pattern.as_deref(),
401 ctx,
402 "pattern",
403 input_schema,
404 codec,
405 )?,
406 )),
407 ExprType::Extension(extension) => {
408 let inputs: Vec<Arc<dyn PhysicalExpr>> = extension
409 .inputs
410 .iter()
411 .map(|e| parse_physical_expr(e, ctx, input_schema, codec))
412 .collect::<Result<_>>()?;
413 (codec.try_decode_expr(extension.expr.as_slice(), &inputs)?) as _
414 }
415 };
416
417 Ok(pexpr)
418}
419
420fn parse_required_physical_expr(
421 expr: Option<&protobuf::PhysicalExprNode>,
422 ctx: &SessionContext,
423 field: &str,
424 input_schema: &Schema,
425 codec: &dyn PhysicalExtensionCodec,
426) -> Result<Arc<dyn PhysicalExpr>> {
427 expr.map(|e| parse_physical_expr(e, ctx, input_schema, codec))
428 .transpose()?
429 .ok_or_else(|| {
430 DataFusionError::Internal(format!("Missing required field {field:?}"))
431 })
432}
433
434pub fn parse_protobuf_hash_partitioning(
435 partitioning: Option<&protobuf::PhysicalHashRepartition>,
436 ctx: &SessionContext,
437 input_schema: &Schema,
438 codec: &dyn PhysicalExtensionCodec,
439) -> Result<Option<Partitioning>> {
440 match partitioning {
441 Some(hash_part) => {
442 let expr =
443 parse_physical_exprs(&hash_part.hash_expr, ctx, input_schema, codec)?;
444
445 Ok(Some(Partitioning::Hash(
446 expr,
447 hash_part.partition_count.try_into().unwrap(),
448 )))
449 }
450 None => Ok(None),
451 }
452}
453
454pub fn parse_protobuf_partitioning(
455 partitioning: Option<&protobuf::Partitioning>,
456 ctx: &SessionContext,
457 input_schema: &Schema,
458 codec: &dyn PhysicalExtensionCodec,
459) -> Result<Option<Partitioning>> {
460 match partitioning {
461 Some(protobuf::Partitioning { partition_method }) => match partition_method {
462 Some(protobuf::partitioning::PartitionMethod::RoundRobin(
463 partition_count,
464 )) => Ok(Some(Partitioning::RoundRobinBatch(
465 *partition_count as usize,
466 ))),
467 Some(protobuf::partitioning::PartitionMethod::Hash(hash_repartition)) => {
468 parse_protobuf_hash_partitioning(
469 Some(hash_repartition),
470 ctx,
471 input_schema,
472 codec,
473 )
474 }
475 Some(protobuf::partitioning::PartitionMethod::Unknown(partition_count)) => {
476 Ok(Some(Partitioning::UnknownPartitioning(
477 *partition_count as usize,
478 )))
479 }
480 None => Ok(None),
481 },
482 None => Ok(None),
483 }
484}
485
486pub fn parse_protobuf_file_scan_schema(
487 proto: &protobuf::FileScanExecConf,
488) -> Result<Arc<Schema>> {
489 Ok(Arc::new(convert_required!(proto.schema)?))
490}
491
492pub fn parse_protobuf_file_scan_config(
493 proto: &protobuf::FileScanExecConf,
494 ctx: &SessionContext,
495 codec: &dyn PhysicalExtensionCodec,
496 file_source: Arc<dyn FileSource>,
497) -> Result<FileScanConfig> {
498 let schema: Arc<Schema> = parse_protobuf_file_scan_schema(proto)?;
499 let projection = proto
500 .projection
501 .iter()
502 .map(|i| *i as usize)
503 .collect::<Vec<_>>();
504
505 let constraints = convert_required!(proto.constraints)?;
506 let statistics = convert_required!(proto.statistics)?;
507
508 let file_groups = proto
509 .file_groups
510 .iter()
511 .map(|f| f.try_into())
512 .collect::<Result<Vec<_>, _>>()?;
513
514 let object_store_url = match proto.object_store_url.is_empty() {
515 false => ObjectStoreUrl::parse(&proto.object_store_url)?,
516 true => ObjectStoreUrl::local_filesystem(),
517 };
518
519 let table_partition_cols = proto
521 .table_partition_cols
522 .iter()
523 .map(|col| Ok(schema.field_with_name(col)?.clone()))
524 .collect::<Result<Vec<_>>>()?;
525
526 let file_schema = Arc::new(Schema::new(
530 schema
531 .fields()
532 .iter()
533 .filter(|field| !table_partition_cols.contains(field))
534 .cloned()
535 .collect::<Vec<_>>(),
536 ));
537
538 let mut output_ordering = vec![];
539 for node_collection in &proto.output_ordering {
540 let sort_exprs = parse_physical_sort_exprs(
541 &node_collection.physical_sort_expr_nodes,
542 ctx,
543 &schema,
544 codec,
545 )?;
546 output_ordering.extend(LexOrdering::new(sort_exprs));
547 }
548
549 let config = FileScanConfigBuilder::new(object_store_url, file_schema, file_source)
550 .with_file_groups(file_groups)
551 .with_constraints(constraints)
552 .with_statistics(statistics)
553 .with_projection(Some(projection))
554 .with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize))
555 .with_table_partition_cols(table_partition_cols)
556 .with_output_ordering(output_ordering)
557 .with_batch_size(proto.batch_size.map(|s| s as usize))
558 .build();
559 Ok(config)
560}
561
562pub fn parse_record_batches(buf: &[u8]) -> Result<Vec<RecordBatch>> {
563 if buf.is_empty() {
564 return Ok(vec![]);
565 }
566 let reader = StreamReader::try_new(buf, None)?;
567 let mut batches = Vec::new();
568 for batch in reader {
569 batches.push(batch?);
570 }
571 Ok(batches)
572}
573
574impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile {
575 type Error = DataFusionError;
576
577 fn try_from(val: &protobuf::PartitionedFile) -> Result<Self, Self::Error> {
578 Ok(PartitionedFile {
579 object_meta: ObjectMeta {
580 location: Path::from(val.path.as_str()),
581 last_modified: Utc.timestamp_nanos(val.last_modified_ns as i64),
582 size: val.size,
583 e_tag: None,
584 version: None,
585 },
586 partition_values: val
587 .partition_values
588 .iter()
589 .map(|v| v.try_into())
590 .collect::<Result<Vec<_>, _>>()?,
591 range: val.range.as_ref().map(|v| v.try_into()).transpose()?,
592 statistics: val
593 .statistics
594 .as_ref()
595 .map(|v| v.try_into().map(Arc::new))
596 .transpose()?,
597 extensions: None,
598 metadata_size_hint: None,
599 })
600 }
601}
602
603impl TryFrom<&protobuf::FileRange> for FileRange {
604 type Error = DataFusionError;
605
606 fn try_from(value: &protobuf::FileRange) -> Result<Self, Self::Error> {
607 Ok(FileRange {
608 start: value.start,
609 end: value.end,
610 })
611 }
612}
613
614impl TryFrom<&protobuf::FileGroup> for FileGroup {
615 type Error = DataFusionError;
616
617 fn try_from(val: &protobuf::FileGroup) -> Result<Self, Self::Error> {
618 let files = val
619 .files
620 .iter()
621 .map(|f| f.try_into())
622 .collect::<Result<Vec<_>, _>>()?;
623 Ok(FileGroup::new(files))
624 }
625}
626
627impl TryFrom<&protobuf::JsonSink> for JsonSink {
628 type Error = DataFusionError;
629
630 fn try_from(value: &protobuf::JsonSink) -> Result<Self, Self::Error> {
631 Ok(Self::new(
632 convert_required!(value.config)?,
633 convert_required!(value.writer_options)?,
634 ))
635 }
636}
637
638#[cfg(feature = "parquet")]
639impl TryFrom<&protobuf::ParquetSink> for ParquetSink {
640 type Error = DataFusionError;
641
642 fn try_from(value: &protobuf::ParquetSink) -> Result<Self, Self::Error> {
643 Ok(Self::new(
644 convert_required!(value.config)?,
645 convert_required!(value.parquet_options)?,
646 ))
647 }
648}
649
650impl TryFrom<&protobuf::CsvSink> for CsvSink {
651 type Error = DataFusionError;
652
653 fn try_from(value: &protobuf::CsvSink) -> Result<Self, Self::Error> {
654 Ok(Self::new(
655 convert_required!(value.config)?,
656 convert_required!(value.writer_options)?,
657 ))
658 }
659}
660
661impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig {
662 type Error = DataFusionError;
663
664 fn try_from(conf: &protobuf::FileSinkConfig) -> Result<Self, Self::Error> {
665 let file_group = FileGroup::new(
666 conf.file_groups
667 .iter()
668 .map(|f| f.try_into())
669 .collect::<Result<Vec<_>>>()?,
670 );
671 let table_paths = conf
672 .table_paths
673 .iter()
674 .map(ListingTableUrl::parse)
675 .collect::<Result<Vec<_>>>()?;
676 let table_partition_cols = conf
677 .table_partition_cols
678 .iter()
679 .map(|protobuf::PartitionColumn { name, arrow_type }| {
680 let data_type = convert_required!(arrow_type)?;
681 Ok((name.clone(), data_type))
682 })
683 .collect::<Result<Vec<_>>>()?;
684 let insert_op = match conf.insert_op() {
685 protobuf::InsertOp::Append => InsertOp::Append,
686 protobuf::InsertOp::Overwrite => InsertOp::Overwrite,
687 protobuf::InsertOp::Replace => InsertOp::Replace,
688 };
689 Ok(Self {
690 original_url: String::default(),
691 object_store_url: ObjectStoreUrl::parse(&conf.object_store_url)?,
692 file_group,
693 table_paths,
694 output_schema: Arc::new(convert_required!(conf.output_schema)?),
695 table_partition_cols,
696 insert_op,
697 keep_partition_by_columns: conf.keep_partition_by_columns,
698 file_extension: conf.file_extension.clone(),
699 })
700 }
701}