1use std::fmt::Debug;
19use std::sync::Arc;
20
21use datafusion::physical_expr::aggregate::AggregateExprBuilder;
22use prost::bytes::BufMut;
23use prost::Message;
24
25use datafusion::arrow::compute::SortOptions;
26use datafusion::arrow::datatypes::SchemaRef;
27use datafusion::datasource::file_format::csv::CsvSink;
28use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
29use datafusion::datasource::file_format::json::JsonSink;
30#[cfg(feature = "parquet")]
31use datafusion::datasource::file_format::parquet::ParquetSink;
32#[cfg(feature = "parquet")]
33use datafusion::datasource::physical_plan::ParquetSource;
34use datafusion::datasource::physical_plan::{AvroSource, CsvSource, FileScanConfig};
35use datafusion::datasource::source::DataSourceExec;
36use datafusion::execution::runtime_env::RuntimeEnv;
37use datafusion::execution::FunctionRegistry;
38use datafusion::physical_expr::aggregate::AggregateFunctionExpr;
39use datafusion::physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef};
40use datafusion::physical_plan::aggregates::AggregateMode;
41use datafusion::physical_plan::aggregates::{AggregateExec, PhysicalGroupBy};
42use datafusion::physical_plan::analyze::AnalyzeExec;
43use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
44use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
45use datafusion::physical_plan::empty::EmptyExec;
46use datafusion::physical_plan::explain::ExplainExec;
47use datafusion::physical_plan::expressions::PhysicalSortExpr;
48use datafusion::physical_plan::filter::FilterExec;
49use datafusion::physical_plan::insert::DataSinkExec;
50use datafusion::physical_plan::joins::utils::{ColumnIndex, JoinFilter};
51use datafusion::physical_plan::joins::{
52 CrossJoinExec, NestedLoopJoinExec, StreamJoinPartitionMode, SymmetricHashJoinExec,
53};
54use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
55use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
56use datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
57use datafusion::physical_plan::projection::ProjectionExec;
58use datafusion::physical_plan::repartition::RepartitionExec;
59use datafusion::physical_plan::sorts::sort::SortExec;
60use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
61use datafusion::physical_plan::union::{InterleaveExec, UnionExec};
62use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec};
63use datafusion::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
64use datafusion::physical_plan::{
65 ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr,
66};
67use datafusion_common::config::TableParquetOptions;
68use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
69use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
70
71use crate::common::{byte_to_string, str_to_byte};
72use crate::physical_plan::from_proto::{
73 parse_physical_expr, parse_physical_sort_expr, parse_physical_sort_exprs,
74 parse_physical_window_expr, parse_protobuf_file_scan_config,
75 parse_protobuf_file_scan_schema,
76};
77use crate::physical_plan::to_proto::{
78 serialize_file_scan_config, serialize_maybe_filter, serialize_physical_aggr_expr,
79 serialize_physical_window_expr,
80};
81use crate::protobuf::physical_aggregate_expr_node::AggregateFunction;
82use crate::protobuf::physical_expr_node::ExprType;
83use crate::protobuf::physical_plan_node::PhysicalPlanType;
84use crate::protobuf::{
85 self, proto_error, window_agg_exec_node, ListUnnest as ProtoListUnnest,
86};
87use crate::{convert_required, into_required};
88
89use self::from_proto::parse_protobuf_partitioning;
90use self::to_proto::{serialize_partitioning, serialize_physical_expr};
91
92pub mod from_proto;
93pub mod to_proto;
94
95impl AsExecutionPlan for protobuf::PhysicalPlanNode {
96 fn try_decode(buf: &[u8]) -> Result<Self>
97 where
98 Self: Sized,
99 {
100 protobuf::PhysicalPlanNode::decode(buf).map_err(|e| {
101 DataFusionError::Internal(format!("failed to decode physical plan: {e:?}"))
102 })
103 }
104
105 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
106 where
107 B: BufMut,
108 Self: Sized,
109 {
110 self.encode(buf).map_err(|e| {
111 DataFusionError::Internal(format!("failed to encode physical plan: {e:?}"))
112 })
113 }
114
115 fn try_into_physical_plan(
116 &self,
117 registry: &dyn FunctionRegistry,
118 runtime: &RuntimeEnv,
119 extension_codec: &dyn PhysicalExtensionCodec,
120 ) -> Result<Arc<dyn ExecutionPlan>> {
121 let plan = self.physical_plan_type.as_ref().ok_or_else(|| {
122 proto_error(format!(
123 "physical_plan::from_proto() Unsupported physical plan '{self:?}'"
124 ))
125 })?;
126 match plan {
127 PhysicalPlanType::Explain(explain) => Ok(Arc::new(ExplainExec::new(
128 Arc::new(explain.schema.as_ref().unwrap().try_into()?),
129 explain
130 .stringified_plans
131 .iter()
132 .map(|plan| plan.into())
133 .collect(),
134 explain.verbose,
135 ))),
136 PhysicalPlanType::Projection(projection) => {
137 let input: Arc<dyn ExecutionPlan> = into_physical_plan(
138 &projection.input,
139 registry,
140 runtime,
141 extension_codec,
142 )?;
143 let exprs = projection
144 .expr
145 .iter()
146 .zip(projection.expr_name.iter())
147 .map(|(expr, name)| {
148 Ok((
149 parse_physical_expr(
150 expr,
151 registry,
152 input.schema().as_ref(),
153 extension_codec,
154 )?,
155 name.to_string(),
156 ))
157 })
158 .collect::<Result<Vec<(Arc<dyn PhysicalExpr>, String)>>>()?;
159 Ok(Arc::new(ProjectionExec::try_new(exprs, input)?))
160 }
161 PhysicalPlanType::Filter(filter) => {
162 let input: Arc<dyn ExecutionPlan> = into_physical_plan(
163 &filter.input,
164 registry,
165 runtime,
166 extension_codec,
167 )?;
168 let predicate = filter
169 .expr
170 .as_ref()
171 .map(|expr| {
172 parse_physical_expr(
173 expr,
174 registry,
175 input.schema().as_ref(),
176 extension_codec,
177 )
178 })
179 .transpose()?
180 .ok_or_else(|| {
181 DataFusionError::Internal(
182 "filter (FilterExecNode) in PhysicalPlanNode is missing."
183 .to_owned(),
184 )
185 })?;
186 let filter_selectivity = filter.default_filter_selectivity.try_into();
187 let projection = if !filter.projection.is_empty() {
188 Some(
189 filter
190 .projection
191 .iter()
192 .map(|i| *i as usize)
193 .collect::<Vec<_>>(),
194 )
195 } else {
196 None
197 };
198 let filter =
199 FilterExec::try_new(predicate, input)?.with_projection(projection)?;
200 match filter_selectivity {
201 Ok(filter_selectivity) => Ok(Arc::new(
202 filter.with_default_selectivity(filter_selectivity)?,
203 )),
204 Err(_) => Err(DataFusionError::Internal(
205 "filter_selectivity in PhysicalPlanNode is invalid ".to_owned(),
206 )),
207 }
208 }
209 PhysicalPlanType::CsvScan(scan) => {
210 let escape = if let Some(
211 protobuf::csv_scan_exec_node::OptionalEscape::Escape(escape),
212 ) = &scan.optional_escape
213 {
214 Some(str_to_byte(escape, "escape")?)
215 } else {
216 None
217 };
218
219 let comment = if let Some(
220 protobuf::csv_scan_exec_node::OptionalComment::Comment(comment),
221 ) = &scan.optional_comment
222 {
223 Some(str_to_byte(comment, "comment")?)
224 } else {
225 None
226 };
227
228 let source = Arc::new(
229 CsvSource::new(
230 scan.has_header,
231 str_to_byte(&scan.delimiter, "delimiter")?,
232 0,
233 )
234 .with_escape(escape)
235 .with_comment(comment),
236 );
237
238 let conf = parse_protobuf_file_scan_config(
239 scan.base_conf.as_ref().unwrap(),
240 registry,
241 extension_codec,
242 source,
243 )?
244 .with_newlines_in_values(scan.newlines_in_values)
245 .with_file_compression_type(FileCompressionType::UNCOMPRESSED);
246 Ok(conf.build())
247 }
248 #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
249 PhysicalPlanType::ParquetScan(scan) => {
250 #[cfg(feature = "parquet")]
251 {
252 let schema = parse_protobuf_file_scan_schema(
253 scan.base_conf.as_ref().unwrap(),
254 )?;
255 let predicate = scan
256 .predicate
257 .as_ref()
258 .map(|expr| {
259 parse_physical_expr(
260 expr,
261 registry,
262 schema.as_ref(),
263 extension_codec,
264 )
265 })
266 .transpose()?;
267 let mut options = TableParquetOptions::default();
268
269 if let Some(table_options) = scan.parquet_options.as_ref() {
270 options = table_options.try_into()?;
271 }
272 let mut source = ParquetSource::new(options);
273
274 if let Some(predicate) = predicate {
275 source = source.with_predicate(Arc::clone(&schema), predicate);
276 }
277 let base_config = parse_protobuf_file_scan_config(
278 scan.base_conf.as_ref().unwrap(),
279 registry,
280 extension_codec,
281 Arc::new(source),
282 )?;
283 Ok(base_config.build())
284 }
285 #[cfg(not(feature = "parquet"))]
286 panic!("Unable to process a Parquet PhysicalPlan when `parquet` feature is not enabled")
287 }
288 PhysicalPlanType::AvroScan(scan) => {
289 let conf = parse_protobuf_file_scan_config(
290 scan.base_conf.as_ref().unwrap(),
291 registry,
292 extension_codec,
293 Arc::new(AvroSource::new()),
294 )?;
295 Ok(conf.build())
296 }
297 PhysicalPlanType::CoalesceBatches(coalesce_batches) => {
298 let input: Arc<dyn ExecutionPlan> = into_physical_plan(
299 &coalesce_batches.input,
300 registry,
301 runtime,
302 extension_codec,
303 )?;
304 Ok(Arc::new(
305 CoalesceBatchesExec::new(
306 input,
307 coalesce_batches.target_batch_size as usize,
308 )
309 .with_fetch(coalesce_batches.fetch.map(|f| f as usize)),
310 ))
311 }
312 PhysicalPlanType::Merge(merge) => {
313 let input: Arc<dyn ExecutionPlan> =
314 into_physical_plan(&merge.input, registry, runtime, extension_codec)?;
315 Ok(Arc::new(CoalescePartitionsExec::new(input)))
316 }
317 PhysicalPlanType::Repartition(repart) => {
318 let input: Arc<dyn ExecutionPlan> = into_physical_plan(
319 &repart.input,
320 registry,
321 runtime,
322 extension_codec,
323 )?;
324 let partitioning = parse_protobuf_partitioning(
325 repart.partitioning.as_ref(),
326 registry,
327 input.schema().as_ref(),
328 extension_codec,
329 )?;
330 Ok(Arc::new(RepartitionExec::try_new(
331 input,
332 partitioning.unwrap(),
333 )?))
334 }
335 PhysicalPlanType::GlobalLimit(limit) => {
336 let input: Arc<dyn ExecutionPlan> =
337 into_physical_plan(&limit.input, registry, runtime, extension_codec)?;
338 let fetch = if limit.fetch >= 0 {
339 Some(limit.fetch as usize)
340 } else {
341 None
342 };
343 Ok(Arc::new(GlobalLimitExec::new(
344 input,
345 limit.skip as usize,
346 fetch,
347 )))
348 }
349 PhysicalPlanType::LocalLimit(limit) => {
350 let input: Arc<dyn ExecutionPlan> =
351 into_physical_plan(&limit.input, registry, runtime, extension_codec)?;
352 Ok(Arc::new(LocalLimitExec::new(input, limit.fetch as usize)))
353 }
354 PhysicalPlanType::Window(window_agg) => {
355 let input: Arc<dyn ExecutionPlan> = into_physical_plan(
356 &window_agg.input,
357 registry,
358 runtime,
359 extension_codec,
360 )?;
361 let input_schema = input.schema();
362
363 let physical_window_expr: Vec<Arc<dyn WindowExpr>> = window_agg
364 .window_expr
365 .iter()
366 .map(|window_expr| {
367 parse_physical_window_expr(
368 window_expr,
369 registry,
370 input_schema.as_ref(),
371 extension_codec,
372 )
373 })
374 .collect::<Result<Vec<_>, _>>()?;
375
376 let partition_keys = window_agg
377 .partition_keys
378 .iter()
379 .map(|expr| {
380 parse_physical_expr(
381 expr,
382 registry,
383 input.schema().as_ref(),
384 extension_codec,
385 )
386 })
387 .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
388
389 if let Some(input_order_mode) = window_agg.input_order_mode.as_ref() {
390 let input_order_mode = match input_order_mode {
391 window_agg_exec_node::InputOrderMode::Linear(_) => {
392 InputOrderMode::Linear
393 }
394 window_agg_exec_node::InputOrderMode::PartiallySorted(
395 protobuf::PartiallySortedInputOrderMode { columns },
396 ) => InputOrderMode::PartiallySorted(
397 columns.iter().map(|c| *c as usize).collect(),
398 ),
399 window_agg_exec_node::InputOrderMode::Sorted(_) => {
400 InputOrderMode::Sorted
401 }
402 };
403
404 Ok(Arc::new(BoundedWindowAggExec::try_new(
405 physical_window_expr,
406 input,
407 input_order_mode,
408 !partition_keys.is_empty(),
409 )?))
410 } else {
411 Ok(Arc::new(WindowAggExec::try_new(
412 physical_window_expr,
413 input,
414 !partition_keys.is_empty(),
415 )?))
416 }
417 }
418 PhysicalPlanType::Aggregate(hash_agg) => {
419 let input: Arc<dyn ExecutionPlan> = into_physical_plan(
420 &hash_agg.input,
421 registry,
422 runtime,
423 extension_codec,
424 )?;
425 let mode = protobuf::AggregateMode::try_from(hash_agg.mode).map_err(
426 |_| {
427 proto_error(format!(
428 "Received a AggregateNode message with unknown AggregateMode {}",
429 hash_agg.mode
430 ))
431 },
432 )?;
433 let agg_mode: AggregateMode = match mode {
434 protobuf::AggregateMode::Partial => AggregateMode::Partial,
435 protobuf::AggregateMode::Final => AggregateMode::Final,
436 protobuf::AggregateMode::FinalPartitioned => {
437 AggregateMode::FinalPartitioned
438 }
439 protobuf::AggregateMode::Single => AggregateMode::Single,
440 protobuf::AggregateMode::SinglePartitioned => {
441 AggregateMode::SinglePartitioned
442 }
443 };
444
445 let num_expr = hash_agg.group_expr.len();
446
447 let group_expr = hash_agg
448 .group_expr
449 .iter()
450 .zip(hash_agg.group_expr_name.iter())
451 .map(|(expr, name)| {
452 parse_physical_expr(
453 expr,
454 registry,
455 input.schema().as_ref(),
456 extension_codec,
457 )
458 .map(|expr| (expr, name.to_string()))
459 })
460 .collect::<Result<Vec<_>, _>>()?;
461
462 let null_expr = hash_agg
463 .null_expr
464 .iter()
465 .zip(hash_agg.group_expr_name.iter())
466 .map(|(expr, name)| {
467 parse_physical_expr(
468 expr,
469 registry,
470 input.schema().as_ref(),
471 extension_codec,
472 )
473 .map(|expr| (expr, name.to_string()))
474 })
475 .collect::<Result<Vec<_>, _>>()?;
476
477 let groups: Vec<Vec<bool>> = if !hash_agg.groups.is_empty() {
478 hash_agg
479 .groups
480 .chunks(num_expr)
481 .map(|g| g.to_vec())
482 .collect::<Vec<Vec<bool>>>()
483 } else {
484 vec![]
485 };
486
487 let input_schema = hash_agg.input_schema.as_ref().ok_or_else(|| {
488 DataFusionError::Internal(
489 "input_schema in AggregateNode is missing.".to_owned(),
490 )
491 })?;
492 let physical_schema: SchemaRef = SchemaRef::new(input_schema.try_into()?);
493
494 let physical_filter_expr = hash_agg
495 .filter_expr
496 .iter()
497 .map(|expr| {
498 expr.expr
499 .as_ref()
500 .map(|e| {
501 parse_physical_expr(
502 e,
503 registry,
504 &physical_schema,
505 extension_codec,
506 )
507 })
508 .transpose()
509 })
510 .collect::<Result<Vec<_>, _>>()?;
511
512 let physical_aggr_expr: Vec<Arc<AggregateFunctionExpr>> = hash_agg
513 .aggr_expr
514 .iter()
515 .zip(hash_agg.aggr_expr_name.iter())
516 .map(|(expr, name)| {
517 let expr_type = expr.expr_type.as_ref().ok_or_else(|| {
518 proto_error("Unexpected empty aggregate physical expression")
519 })?;
520
521 match expr_type {
522 ExprType::AggregateExpr(agg_node) => {
523 let input_phy_expr: Vec<Arc<dyn PhysicalExpr>> = agg_node.expr.iter()
524 .map(|e| parse_physical_expr(e, registry, &physical_schema, extension_codec)).collect::<Result<Vec<_>>>()?;
525 let ordering_req: LexOrdering = agg_node.ordering_req.iter()
526 .map(|e| parse_physical_sort_expr(e, registry, &physical_schema, extension_codec))
527 .collect::<Result<LexOrdering>>()?;
528 agg_node.aggregate_function.as_ref().map(|func| {
529 match func {
530 AggregateFunction::UserDefinedAggrFunction(udaf_name) => {
531 let agg_udf = match &agg_node.fun_definition {
532 Some(buf) => extension_codec.try_decode_udaf(udaf_name, buf)?,
533 None => registry.udaf(udaf_name)?
534 };
535
536 AggregateExprBuilder::new(agg_udf, input_phy_expr)
537 .schema(Arc::clone(&physical_schema))
538 .alias(name)
539 .with_ignore_nulls(agg_node.ignore_nulls)
540 .with_distinct(agg_node.distinct)
541 .order_by(ordering_req)
542 .build()
543 .map(Arc::new)
544 }
545 }
546 }).transpose()?.ok_or_else(|| {
547 proto_error("Invalid AggregateExpr, missing aggregate_function")
548 })
549 }
550 _ => internal_err!(
551 "Invalid aggregate expression for AggregateExec"
552 ),
553 }
554 })
555 .collect::<Result<Vec<_>, _>>()?;
556
557 let limit = hash_agg
558 .limit
559 .as_ref()
560 .map(|lit_value| lit_value.limit as usize);
561
562 let agg = AggregateExec::try_new(
563 agg_mode,
564 PhysicalGroupBy::new(group_expr, null_expr, groups),
565 physical_aggr_expr,
566 physical_filter_expr,
567 input,
568 physical_schema,
569 )?;
570
571 let agg = agg.with_limit(limit);
572
573 Ok(Arc::new(agg))
574 }
575 PhysicalPlanType::HashJoin(hashjoin) => {
576 let left: Arc<dyn ExecutionPlan> = into_physical_plan(
577 &hashjoin.left,
578 registry,
579 runtime,
580 extension_codec,
581 )?;
582 let right: Arc<dyn ExecutionPlan> = into_physical_plan(
583 &hashjoin.right,
584 registry,
585 runtime,
586 extension_codec,
587 )?;
588 let left_schema = left.schema();
589 let right_schema = right.schema();
590 let on: Vec<(PhysicalExprRef, PhysicalExprRef)> = hashjoin
591 .on
592 .iter()
593 .map(|col| {
594 let left = parse_physical_expr(
595 &col.left.clone().unwrap(),
596 registry,
597 left_schema.as_ref(),
598 extension_codec,
599 )?;
600 let right = parse_physical_expr(
601 &col.right.clone().unwrap(),
602 registry,
603 right_schema.as_ref(),
604 extension_codec,
605 )?;
606 Ok((left, right))
607 })
608 .collect::<Result<_>>()?;
609 let join_type = protobuf::JoinType::try_from(hashjoin.join_type)
610 .map_err(|_| {
611 proto_error(format!(
612 "Received a HashJoinNode message with unknown JoinType {}",
613 hashjoin.join_type
614 ))
615 })?;
616 let filter = hashjoin
617 .filter
618 .as_ref()
619 .map(|f| {
620 let schema = f
621 .schema
622 .as_ref()
623 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
624 .try_into()?;
625
626 let expression = parse_physical_expr(
627 f.expression.as_ref().ok_or_else(|| {
628 proto_error("Unexpected empty filter expression")
629 })?,
630 registry, &schema,
631 extension_codec,
632 )?;
633 let column_indices = f.column_indices
634 .iter()
635 .map(|i| {
636 let side = protobuf::JoinSide::try_from(i.side)
637 .map_err(|_| proto_error(format!(
638 "Received a HashJoinNode message with JoinSide in Filter {}",
639 i.side))
640 )?;
641
642 Ok(ColumnIndex {
643 index: i.index as usize,
644 side: side.into(),
645 })
646 })
647 .collect::<Result<Vec<_>>>()?;
648
649 Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
650 })
651 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
652
653 let partition_mode = protobuf::PartitionMode::try_from(
654 hashjoin.partition_mode,
655 )
656 .map_err(|_| {
657 proto_error(format!(
658 "Received a HashJoinNode message with unknown PartitionMode {}",
659 hashjoin.partition_mode
660 ))
661 })?;
662 let partition_mode = match partition_mode {
663 protobuf::PartitionMode::CollectLeft => PartitionMode::CollectLeft,
664 protobuf::PartitionMode::Partitioned => PartitionMode::Partitioned,
665 protobuf::PartitionMode::Auto => PartitionMode::Auto,
666 };
667 let projection = if !hashjoin.projection.is_empty() {
668 Some(
669 hashjoin
670 .projection
671 .iter()
672 .map(|i| *i as usize)
673 .collect::<Vec<_>>(),
674 )
675 } else {
676 None
677 };
678 Ok(Arc::new(HashJoinExec::try_new(
679 left,
680 right,
681 on,
682 filter,
683 &join_type.into(),
684 projection,
685 partition_mode,
686 hashjoin.null_equals_null,
687 )?))
688 }
689 PhysicalPlanType::SymmetricHashJoin(sym_join) => {
690 let left = into_physical_plan(
691 &sym_join.left,
692 registry,
693 runtime,
694 extension_codec,
695 )?;
696 let right = into_physical_plan(
697 &sym_join.right,
698 registry,
699 runtime,
700 extension_codec,
701 )?;
702 let left_schema = left.schema();
703 let right_schema = right.schema();
704 let on = sym_join
705 .on
706 .iter()
707 .map(|col| {
708 let left = parse_physical_expr(
709 &col.left.clone().unwrap(),
710 registry,
711 left_schema.as_ref(),
712 extension_codec,
713 )?;
714 let right = parse_physical_expr(
715 &col.right.clone().unwrap(),
716 registry,
717 right_schema.as_ref(),
718 extension_codec,
719 )?;
720 Ok((left, right))
721 })
722 .collect::<Result<_>>()?;
723 let join_type = protobuf::JoinType::try_from(sym_join.join_type)
724 .map_err(|_| {
725 proto_error(format!(
726 "Received a SymmetricHashJoin message with unknown JoinType {}",
727 sym_join.join_type
728 ))
729 })?;
730 let filter = sym_join
731 .filter
732 .as_ref()
733 .map(|f| {
734 let schema = f
735 .schema
736 .as_ref()
737 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
738 .try_into()?;
739
740 let expression = parse_physical_expr(
741 f.expression.as_ref().ok_or_else(|| {
742 proto_error("Unexpected empty filter expression")
743 })?,
744 registry, &schema,
745 extension_codec,
746 )?;
747 let column_indices = f.column_indices
748 .iter()
749 .map(|i| {
750 let side = protobuf::JoinSide::try_from(i.side)
751 .map_err(|_| proto_error(format!(
752 "Received a HashJoinNode message with JoinSide in Filter {}",
753 i.side))
754 )?;
755
756 Ok(ColumnIndex {
757 index: i.index as usize,
758 side: side.into(),
759 })
760 })
761 .collect::<Result<_>>()?;
762
763 Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
764 })
765 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
766
767 let left_sort_exprs = parse_physical_sort_exprs(
768 &sym_join.left_sort_exprs,
769 registry,
770 &left_schema,
771 extension_codec,
772 )?;
773 let left_sort_exprs = if left_sort_exprs.is_empty() {
774 None
775 } else {
776 Some(left_sort_exprs)
777 };
778
779 let right_sort_exprs = parse_physical_sort_exprs(
780 &sym_join.right_sort_exprs,
781 registry,
782 &right_schema,
783 extension_codec,
784 )?;
785 let right_sort_exprs = if right_sort_exprs.is_empty() {
786 None
787 } else {
788 Some(right_sort_exprs)
789 };
790
791 let partition_mode =
792 protobuf::StreamPartitionMode::try_from(sym_join.partition_mode).map_err(|_| {
793 proto_error(format!(
794 "Received a SymmetricHashJoin message with unknown PartitionMode {}",
795 sym_join.partition_mode
796 ))
797 })?;
798 let partition_mode = match partition_mode {
799 protobuf::StreamPartitionMode::SinglePartition => {
800 StreamJoinPartitionMode::SinglePartition
801 }
802 protobuf::StreamPartitionMode::PartitionedExec => {
803 StreamJoinPartitionMode::Partitioned
804 }
805 };
806 SymmetricHashJoinExec::try_new(
807 left,
808 right,
809 on,
810 filter,
811 &join_type.into(),
812 sym_join.null_equals_null,
813 left_sort_exprs,
814 right_sort_exprs,
815 partition_mode,
816 )
817 .map(|e| Arc::new(e) as _)
818 }
819 PhysicalPlanType::Union(union) => {
820 let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
821 for input in &union.inputs {
822 inputs.push(input.try_into_physical_plan(
823 registry,
824 runtime,
825 extension_codec,
826 )?);
827 }
828 Ok(Arc::new(UnionExec::new(inputs)))
829 }
830 PhysicalPlanType::Interleave(interleave) => {
831 let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
832 for input in &interleave.inputs {
833 inputs.push(input.try_into_physical_plan(
834 registry,
835 runtime,
836 extension_codec,
837 )?);
838 }
839 Ok(Arc::new(InterleaveExec::try_new(inputs)?))
840 }
841 PhysicalPlanType::CrossJoin(crossjoin) => {
842 let left: Arc<dyn ExecutionPlan> = into_physical_plan(
843 &crossjoin.left,
844 registry,
845 runtime,
846 extension_codec,
847 )?;
848 let right: Arc<dyn ExecutionPlan> = into_physical_plan(
849 &crossjoin.right,
850 registry,
851 runtime,
852 extension_codec,
853 )?;
854 Ok(Arc::new(CrossJoinExec::new(left, right)))
855 }
856 PhysicalPlanType::Empty(empty) => {
857 let schema = Arc::new(convert_required!(empty.schema)?);
858 Ok(Arc::new(EmptyExec::new(schema)))
859 }
860 PhysicalPlanType::PlaceholderRow(placeholder) => {
861 let schema = Arc::new(convert_required!(placeholder.schema)?);
862 Ok(Arc::new(PlaceholderRowExec::new(schema)))
863 }
864 PhysicalPlanType::Sort(sort) => {
865 let input: Arc<dyn ExecutionPlan> =
866 into_physical_plan(&sort.input, registry, runtime, extension_codec)?;
867 let exprs = sort
868 .expr
869 .iter()
870 .map(|expr| {
871 let expr = expr.expr_type.as_ref().ok_or_else(|| {
872 proto_error(format!(
873 "physical_plan::from_proto() Unexpected expr {self:?}"
874 ))
875 })?;
876 if let ExprType::Sort(sort_expr) = expr {
877 let expr = sort_expr
878 .expr
879 .as_ref()
880 .ok_or_else(|| {
881 proto_error(format!(
882 "physical_plan::from_proto() Unexpected sort expr {self:?}"
883 ))
884 })?
885 .as_ref();
886 Ok(PhysicalSortExpr {
887 expr: parse_physical_expr(expr, registry, input.schema().as_ref(), extension_codec)?,
888 options: SortOptions {
889 descending: !sort_expr.asc,
890 nulls_first: sort_expr.nulls_first,
891 },
892 })
893 } else {
894 internal_err!(
895 "physical_plan::from_proto() {self:?}"
896 )
897 }
898 })
899 .collect::<Result<LexOrdering, _>>()?;
900 let fetch = if sort.fetch < 0 {
901 None
902 } else {
903 Some(sort.fetch as usize)
904 };
905 let new_sort = SortExec::new(exprs, input)
906 .with_fetch(fetch)
907 .with_preserve_partitioning(sort.preserve_partitioning);
908
909 Ok(Arc::new(new_sort))
910 }
911 PhysicalPlanType::SortPreservingMerge(sort) => {
912 let input: Arc<dyn ExecutionPlan> =
913 into_physical_plan(&sort.input, registry, runtime, extension_codec)?;
914 let exprs = sort
915 .expr
916 .iter()
917 .map(|expr| {
918 let expr = expr.expr_type.as_ref().ok_or_else(|| {
919 proto_error(format!(
920 "physical_plan::from_proto() Unexpected expr {self:?}"
921 ))
922 })?;
923 if let ExprType::Sort(sort_expr) = expr {
924 let expr = sort_expr
925 .expr
926 .as_ref()
927 .ok_or_else(|| {
928 proto_error(format!(
929 "physical_plan::from_proto() Unexpected sort expr {self:?}"
930 ))
931 })?
932 .as_ref();
933 Ok(PhysicalSortExpr {
934 expr: parse_physical_expr(expr, registry, input.schema().as_ref(), extension_codec)?,
935 options: SortOptions {
936 descending: !sort_expr.asc,
937 nulls_first: sort_expr.nulls_first,
938 },
939 })
940 } else {
941 internal_err!(
942 "physical_plan::from_proto() {self:?}"
943 )
944 }
945 })
946 .collect::<Result<LexOrdering, _>>()?;
947 let fetch = if sort.fetch < 0 {
948 None
949 } else {
950 Some(sort.fetch as usize)
951 };
952 Ok(Arc::new(
953 SortPreservingMergeExec::new(exprs, input).with_fetch(fetch),
954 ))
955 }
956 PhysicalPlanType::Extension(extension) => {
957 let inputs: Vec<Arc<dyn ExecutionPlan>> = extension
958 .inputs
959 .iter()
960 .map(|i| i.try_into_physical_plan(registry, runtime, extension_codec))
961 .collect::<Result<_>>()?;
962
963 let extension_node = extension_codec.try_decode(
964 extension.node.as_slice(),
965 &inputs,
966 registry,
967 )?;
968
969 Ok(extension_node)
970 }
971 PhysicalPlanType::NestedLoopJoin(join) => {
972 let left: Arc<dyn ExecutionPlan> =
973 into_physical_plan(&join.left, registry, runtime, extension_codec)?;
974 let right: Arc<dyn ExecutionPlan> =
975 into_physical_plan(&join.right, registry, runtime, extension_codec)?;
976 let join_type =
977 protobuf::JoinType::try_from(join.join_type).map_err(|_| {
978 proto_error(format!(
979 "Received a NestedLoopJoinExecNode message with unknown JoinType {}",
980 join.join_type
981 ))
982 })?;
983 let filter = join
984 .filter
985 .as_ref()
986 .map(|f| {
987 let schema = f
988 .schema
989 .as_ref()
990 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
991 .try_into()?;
992
993 let expression = parse_physical_expr(
994 f.expression.as_ref().ok_or_else(|| {
995 proto_error("Unexpected empty filter expression")
996 })?,
997 registry, &schema,
998 extension_codec,
999 )?;
1000 let column_indices = f.column_indices
1001 .iter()
1002 .map(|i| {
1003 let side = protobuf::JoinSide::try_from(i.side)
1004 .map_err(|_| proto_error(format!(
1005 "Received a NestedLoopJoinExecNode message with JoinSide in Filter {}",
1006 i.side))
1007 )?;
1008
1009 Ok(ColumnIndex {
1010 index: i.index as usize,
1011 side: side.into(),
1012 })
1013 })
1014 .collect::<Result<Vec<_>>>()?;
1015
1016 Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1017 })
1018 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1019
1020 let projection = if !join.projection.is_empty() {
1021 Some(
1022 join.projection
1023 .iter()
1024 .map(|i| *i as usize)
1025 .collect::<Vec<_>>(),
1026 )
1027 } else {
1028 None
1029 };
1030
1031 Ok(Arc::new(NestedLoopJoinExec::try_new(
1032 left,
1033 right,
1034 filter,
1035 &join_type.into(),
1036 projection,
1037 )?))
1038 }
1039 PhysicalPlanType::Analyze(analyze) => {
1040 let input: Arc<dyn ExecutionPlan> = into_physical_plan(
1041 &analyze.input,
1042 registry,
1043 runtime,
1044 extension_codec,
1045 )?;
1046 Ok(Arc::new(AnalyzeExec::new(
1047 analyze.verbose,
1048 analyze.show_statistics,
1049 input,
1050 Arc::new(convert_required!(analyze.schema)?),
1051 )))
1052 }
1053 PhysicalPlanType::JsonSink(sink) => {
1054 let input =
1055 into_physical_plan(&sink.input, registry, runtime, extension_codec)?;
1056
1057 let data_sink: JsonSink = sink
1058 .sink
1059 .as_ref()
1060 .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1061 .try_into()?;
1062 let sink_schema = input.schema();
1063 let sort_order = sink
1064 .sort_order
1065 .as_ref()
1066 .map(|collection| {
1067 parse_physical_sort_exprs(
1068 &collection.physical_sort_expr_nodes,
1069 registry,
1070 &sink_schema,
1071 extension_codec,
1072 )
1073 .map(LexRequirement::from)
1074 })
1075 .transpose()?;
1076 Ok(Arc::new(DataSinkExec::new(
1077 input,
1078 Arc::new(data_sink),
1079 sort_order,
1080 )))
1081 }
1082 PhysicalPlanType::CsvSink(sink) => {
1083 let input =
1084 into_physical_plan(&sink.input, registry, runtime, extension_codec)?;
1085
1086 let data_sink: CsvSink = sink
1087 .sink
1088 .as_ref()
1089 .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1090 .try_into()?;
1091 let sink_schema = input.schema();
1092 let sort_order = sink
1093 .sort_order
1094 .as_ref()
1095 .map(|collection| {
1096 parse_physical_sort_exprs(
1097 &collection.physical_sort_expr_nodes,
1098 registry,
1099 &sink_schema,
1100 extension_codec,
1101 )
1102 .map(LexRequirement::from)
1103 })
1104 .transpose()?;
1105 Ok(Arc::new(DataSinkExec::new(
1106 input,
1107 Arc::new(data_sink),
1108 sort_order,
1109 )))
1110 }
1111 #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
1112 PhysicalPlanType::ParquetSink(sink) => {
1113 #[cfg(feature = "parquet")]
1114 {
1115 let input = into_physical_plan(
1116 &sink.input,
1117 registry,
1118 runtime,
1119 extension_codec,
1120 )?;
1121
1122 let data_sink: ParquetSink = sink
1123 .sink
1124 .as_ref()
1125 .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1126 .try_into()?;
1127 let sink_schema = input.schema();
1128 let sort_order = sink
1129 .sort_order
1130 .as_ref()
1131 .map(|collection| {
1132 parse_physical_sort_exprs(
1133 &collection.physical_sort_expr_nodes,
1134 registry,
1135 &sink_schema,
1136 extension_codec,
1137 )
1138 .map(LexRequirement::from)
1139 })
1140 .transpose()?;
1141 Ok(Arc::new(DataSinkExec::new(
1142 input,
1143 Arc::new(data_sink),
1144 sort_order,
1145 )))
1146 }
1147 #[cfg(not(feature = "parquet"))]
1148 panic!("Trying to use ParquetSink without `parquet` feature enabled");
1149 }
1150 PhysicalPlanType::Unnest(unnest) => {
1151 let input = into_physical_plan(
1152 &unnest.input,
1153 registry,
1154 runtime,
1155 extension_codec,
1156 )?;
1157
1158 Ok(Arc::new(UnnestExec::new(
1159 input,
1160 unnest
1161 .list_type_columns
1162 .iter()
1163 .map(|c| ListUnnest {
1164 index_in_input_schema: c.index_in_input_schema as _,
1165 depth: c.depth as _,
1166 })
1167 .collect(),
1168 unnest.struct_type_columns.iter().map(|c| *c as _).collect(),
1169 Arc::new(convert_required!(unnest.schema)?),
1170 into_required!(unnest.options)?,
1171 )))
1172 }
1173 }
1174 }
1175
1176 fn try_from_physical_plan(
1177 plan: Arc<dyn ExecutionPlan>,
1178 extension_codec: &dyn PhysicalExtensionCodec,
1179 ) -> Result<Self>
1180 where
1181 Self: Sized,
1182 {
1183 let plan_clone = Arc::clone(&plan);
1184 let plan = plan.as_any();
1185
1186 if let Some(exec) = plan.downcast_ref::<ExplainExec>() {
1187 return Ok(protobuf::PhysicalPlanNode {
1188 physical_plan_type: Some(PhysicalPlanType::Explain(
1189 protobuf::ExplainExecNode {
1190 schema: Some(exec.schema().as_ref().try_into()?),
1191 stringified_plans: exec
1192 .stringified_plans()
1193 .iter()
1194 .map(|plan| plan.into())
1195 .collect(),
1196 verbose: exec.verbose(),
1197 },
1198 )),
1199 });
1200 }
1201
1202 if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
1203 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1204 exec.input().to_owned(),
1205 extension_codec,
1206 )?;
1207 let expr = exec
1208 .expr()
1209 .iter()
1210 .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
1211 .collect::<Result<Vec<_>>>()?;
1212 let expr_name = exec.expr().iter().map(|expr| expr.1.clone()).collect();
1213 return Ok(protobuf::PhysicalPlanNode {
1214 physical_plan_type: Some(PhysicalPlanType::Projection(Box::new(
1215 protobuf::ProjectionExecNode {
1216 input: Some(Box::new(input)),
1217 expr,
1218 expr_name,
1219 },
1220 ))),
1221 });
1222 }
1223
1224 if let Some(exec) = plan.downcast_ref::<AnalyzeExec>() {
1225 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1226 exec.input().to_owned(),
1227 extension_codec,
1228 )?;
1229 return Ok(protobuf::PhysicalPlanNode {
1230 physical_plan_type: Some(PhysicalPlanType::Analyze(Box::new(
1231 protobuf::AnalyzeExecNode {
1232 verbose: exec.verbose(),
1233 show_statistics: exec.show_statistics(),
1234 input: Some(Box::new(input)),
1235 schema: Some(exec.schema().as_ref().try_into()?),
1236 },
1237 ))),
1238 });
1239 }
1240
1241 if let Some(exec) = plan.downcast_ref::<FilterExec>() {
1242 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1243 exec.input().to_owned(),
1244 extension_codec,
1245 )?;
1246 return Ok(protobuf::PhysicalPlanNode {
1247 physical_plan_type: Some(PhysicalPlanType::Filter(Box::new(
1248 protobuf::FilterExecNode {
1249 input: Some(Box::new(input)),
1250 expr: Some(serialize_physical_expr(
1251 exec.predicate(),
1252 extension_codec,
1253 )?),
1254 default_filter_selectivity: exec.default_selectivity() as u32,
1255 projection: exec
1256 .projection()
1257 .as_ref()
1258 .map_or_else(Vec::new, |v| {
1259 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
1260 }),
1261 },
1262 ))),
1263 });
1264 }
1265
1266 if let Some(limit) = plan.downcast_ref::<GlobalLimitExec>() {
1267 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1268 limit.input().to_owned(),
1269 extension_codec,
1270 )?;
1271
1272 return Ok(protobuf::PhysicalPlanNode {
1273 physical_plan_type: Some(PhysicalPlanType::GlobalLimit(Box::new(
1274 protobuf::GlobalLimitExecNode {
1275 input: Some(Box::new(input)),
1276 skip: limit.skip() as u32,
1277 fetch: match limit.fetch() {
1278 Some(n) => n as i64,
1279 _ => -1, },
1281 },
1282 ))),
1283 });
1284 }
1285
1286 if let Some(limit) = plan.downcast_ref::<LocalLimitExec>() {
1287 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1288 limit.input().to_owned(),
1289 extension_codec,
1290 )?;
1291 return Ok(protobuf::PhysicalPlanNode {
1292 physical_plan_type: Some(PhysicalPlanType::LocalLimit(Box::new(
1293 protobuf::LocalLimitExecNode {
1294 input: Some(Box::new(input)),
1295 fetch: limit.fetch() as u32,
1296 },
1297 ))),
1298 });
1299 }
1300
1301 if let Some(exec) = plan.downcast_ref::<HashJoinExec>() {
1302 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
1303 exec.left().to_owned(),
1304 extension_codec,
1305 )?;
1306 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
1307 exec.right().to_owned(),
1308 extension_codec,
1309 )?;
1310 let on: Vec<protobuf::JoinOn> = exec
1311 .on()
1312 .iter()
1313 .map(|tuple| {
1314 let l = serialize_physical_expr(&tuple.0, extension_codec)?;
1315 let r = serialize_physical_expr(&tuple.1, extension_codec)?;
1316 Ok::<_, DataFusionError>(protobuf::JoinOn {
1317 left: Some(l),
1318 right: Some(r),
1319 })
1320 })
1321 .collect::<Result<_>>()?;
1322 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
1323 let filter = exec
1324 .filter()
1325 .as_ref()
1326 .map(|f| {
1327 let expression =
1328 serialize_physical_expr(f.expression(), extension_codec)?;
1329 let column_indices = f
1330 .column_indices()
1331 .iter()
1332 .map(|i| {
1333 let side: protobuf::JoinSide = i.side.to_owned().into();
1334 protobuf::ColumnIndex {
1335 index: i.index as u32,
1336 side: side.into(),
1337 }
1338 })
1339 .collect();
1340 let schema = f.schema().as_ref().try_into()?;
1341 Ok(protobuf::JoinFilter {
1342 expression: Some(expression),
1343 column_indices,
1344 schema: Some(schema),
1345 })
1346 })
1347 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
1348
1349 let partition_mode = match exec.partition_mode() {
1350 PartitionMode::CollectLeft => protobuf::PartitionMode::CollectLeft,
1351 PartitionMode::Partitioned => protobuf::PartitionMode::Partitioned,
1352 PartitionMode::Auto => protobuf::PartitionMode::Auto,
1353 };
1354
1355 return Ok(protobuf::PhysicalPlanNode {
1356 physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new(
1357 protobuf::HashJoinExecNode {
1358 left: Some(Box::new(left)),
1359 right: Some(Box::new(right)),
1360 on,
1361 join_type: join_type.into(),
1362 partition_mode: partition_mode.into(),
1363 null_equals_null: exec.null_equals_null(),
1364 filter,
1365 projection: exec.projection.as_ref().map_or_else(Vec::new, |v| {
1366 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
1367 }),
1368 },
1369 ))),
1370 });
1371 }
1372
1373 if let Some(exec) = plan.downcast_ref::<SymmetricHashJoinExec>() {
1374 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
1375 exec.left().to_owned(),
1376 extension_codec,
1377 )?;
1378 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
1379 exec.right().to_owned(),
1380 extension_codec,
1381 )?;
1382 let on = exec
1383 .on()
1384 .iter()
1385 .map(|tuple| {
1386 let l = serialize_physical_expr(&tuple.0, extension_codec)?;
1387 let r = serialize_physical_expr(&tuple.1, extension_codec)?;
1388 Ok::<_, DataFusionError>(protobuf::JoinOn {
1389 left: Some(l),
1390 right: Some(r),
1391 })
1392 })
1393 .collect::<Result<_>>()?;
1394 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
1395 let filter = exec
1396 .filter()
1397 .as_ref()
1398 .map(|f| {
1399 let expression =
1400 serialize_physical_expr(f.expression(), extension_codec)?;
1401 let column_indices = f
1402 .column_indices()
1403 .iter()
1404 .map(|i| {
1405 let side: protobuf::JoinSide = i.side.to_owned().into();
1406 protobuf::ColumnIndex {
1407 index: i.index as u32,
1408 side: side.into(),
1409 }
1410 })
1411 .collect();
1412 let schema = f.schema().as_ref().try_into()?;
1413 Ok(protobuf::JoinFilter {
1414 expression: Some(expression),
1415 column_indices,
1416 schema: Some(schema),
1417 })
1418 })
1419 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
1420
1421 let partition_mode = match exec.partition_mode() {
1422 StreamJoinPartitionMode::SinglePartition => {
1423 protobuf::StreamPartitionMode::SinglePartition
1424 }
1425 StreamJoinPartitionMode::Partitioned => {
1426 protobuf::StreamPartitionMode::PartitionedExec
1427 }
1428 };
1429
1430 let left_sort_exprs = exec
1431 .left_sort_exprs()
1432 .map(|exprs| {
1433 exprs
1434 .iter()
1435 .map(|expr| {
1436 Ok(protobuf::PhysicalSortExprNode {
1437 expr: Some(Box::new(serialize_physical_expr(
1438 &expr.expr,
1439 extension_codec,
1440 )?)),
1441 asc: !expr.options.descending,
1442 nulls_first: expr.options.nulls_first,
1443 })
1444 })
1445 .collect::<Result<Vec<_>>>()
1446 })
1447 .transpose()?
1448 .unwrap_or(vec![]);
1449
1450 let right_sort_exprs = exec
1451 .right_sort_exprs()
1452 .map(|exprs| {
1453 exprs
1454 .iter()
1455 .map(|expr| {
1456 Ok(protobuf::PhysicalSortExprNode {
1457 expr: Some(Box::new(serialize_physical_expr(
1458 &expr.expr,
1459 extension_codec,
1460 )?)),
1461 asc: !expr.options.descending,
1462 nulls_first: expr.options.nulls_first,
1463 })
1464 })
1465 .collect::<Result<Vec<_>>>()
1466 })
1467 .transpose()?
1468 .unwrap_or(vec![]);
1469
1470 return Ok(protobuf::PhysicalPlanNode {
1471 physical_plan_type: Some(PhysicalPlanType::SymmetricHashJoin(Box::new(
1472 protobuf::SymmetricHashJoinExecNode {
1473 left: Some(Box::new(left)),
1474 right: Some(Box::new(right)),
1475 on,
1476 join_type: join_type.into(),
1477 partition_mode: partition_mode.into(),
1478 null_equals_null: exec.null_equals_null(),
1479 left_sort_exprs,
1480 right_sort_exprs,
1481 filter,
1482 },
1483 ))),
1484 });
1485 }
1486
1487 if let Some(exec) = plan.downcast_ref::<CrossJoinExec>() {
1488 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
1489 exec.left().to_owned(),
1490 extension_codec,
1491 )?;
1492 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
1493 exec.right().to_owned(),
1494 extension_codec,
1495 )?;
1496 return Ok(protobuf::PhysicalPlanNode {
1497 physical_plan_type: Some(PhysicalPlanType::CrossJoin(Box::new(
1498 protobuf::CrossJoinExecNode {
1499 left: Some(Box::new(left)),
1500 right: Some(Box::new(right)),
1501 },
1502 ))),
1503 });
1504 }
1505 if let Some(exec) = plan.downcast_ref::<AggregateExec>() {
1506 let groups: Vec<bool> = exec
1507 .group_expr()
1508 .groups()
1509 .iter()
1510 .flatten()
1511 .copied()
1512 .collect();
1513
1514 let group_names = exec
1515 .group_expr()
1516 .expr()
1517 .iter()
1518 .map(|expr| expr.1.to_owned())
1519 .collect();
1520
1521 let filter = exec
1522 .filter_expr()
1523 .iter()
1524 .map(|expr| serialize_maybe_filter(expr.to_owned(), extension_codec))
1525 .collect::<Result<Vec<_>>>()?;
1526
1527 let agg = exec
1528 .aggr_expr()
1529 .iter()
1530 .map(|expr| {
1531 serialize_physical_aggr_expr(expr.to_owned(), extension_codec)
1532 })
1533 .collect::<Result<Vec<_>>>()?;
1534
1535 let agg_names = exec
1536 .aggr_expr()
1537 .iter()
1538 .map(|expr| expr.name().to_string())
1539 .collect::<Vec<_>>();
1540
1541 let agg_mode = match exec.mode() {
1542 AggregateMode::Partial => protobuf::AggregateMode::Partial,
1543 AggregateMode::Final => protobuf::AggregateMode::Final,
1544 AggregateMode::FinalPartitioned => {
1545 protobuf::AggregateMode::FinalPartitioned
1546 }
1547 AggregateMode::Single => protobuf::AggregateMode::Single,
1548 AggregateMode::SinglePartitioned => {
1549 protobuf::AggregateMode::SinglePartitioned
1550 }
1551 };
1552 let input_schema = exec.input_schema();
1553 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1554 exec.input().to_owned(),
1555 extension_codec,
1556 )?;
1557
1558 let null_expr = exec
1559 .group_expr()
1560 .null_expr()
1561 .iter()
1562 .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
1563 .collect::<Result<Vec<_>>>()?;
1564
1565 let group_expr = exec
1566 .group_expr()
1567 .expr()
1568 .iter()
1569 .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
1570 .collect::<Result<Vec<_>>>()?;
1571
1572 let limit = exec.limit().map(|value| protobuf::AggLimit {
1573 limit: value as u64,
1574 });
1575
1576 return Ok(protobuf::PhysicalPlanNode {
1577 physical_plan_type: Some(PhysicalPlanType::Aggregate(Box::new(
1578 protobuf::AggregateExecNode {
1579 group_expr,
1580 group_expr_name: group_names,
1581 aggr_expr: agg,
1582 filter_expr: filter,
1583 aggr_expr_name: agg_names,
1584 mode: agg_mode as i32,
1585 input: Some(Box::new(input)),
1586 input_schema: Some(input_schema.as_ref().try_into()?),
1587 null_expr,
1588 groups,
1589 limit,
1590 },
1591 ))),
1592 });
1593 }
1594
1595 if let Some(empty) = plan.downcast_ref::<EmptyExec>() {
1596 let schema = empty.schema().as_ref().try_into()?;
1597 return Ok(protobuf::PhysicalPlanNode {
1598 physical_plan_type: Some(PhysicalPlanType::Empty(
1599 protobuf::EmptyExecNode {
1600 schema: Some(schema),
1601 },
1602 )),
1603 });
1604 }
1605
1606 if let Some(empty) = plan.downcast_ref::<PlaceholderRowExec>() {
1607 let schema = empty.schema().as_ref().try_into()?;
1608 return Ok(protobuf::PhysicalPlanNode {
1609 physical_plan_type: Some(PhysicalPlanType::PlaceholderRow(
1610 protobuf::PlaceholderRowExecNode {
1611 schema: Some(schema),
1612 },
1613 )),
1614 });
1615 }
1616
1617 if let Some(coalesce_batches) = plan.downcast_ref::<CoalesceBatchesExec>() {
1618 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1619 coalesce_batches.input().to_owned(),
1620 extension_codec,
1621 )?;
1622 return Ok(protobuf::PhysicalPlanNode {
1623 physical_plan_type: Some(PhysicalPlanType::CoalesceBatches(Box::new(
1624 protobuf::CoalesceBatchesExecNode {
1625 input: Some(Box::new(input)),
1626 target_batch_size: coalesce_batches.target_batch_size() as u32,
1627 fetch: coalesce_batches.fetch().map(|n| n as u32),
1628 },
1629 ))),
1630 });
1631 }
1632
1633 if let Some(data_source_exec) = plan.downcast_ref::<DataSourceExec>() {
1634 let data_source = data_source_exec.data_source();
1635 if let Some(maybe_csv) = data_source.as_any().downcast_ref::<FileScanConfig>()
1636 {
1637 let source = maybe_csv.file_source();
1638 if let Some(csv_config) = source.as_any().downcast_ref::<CsvSource>() {
1639 return Ok(protobuf::PhysicalPlanNode {
1640 physical_plan_type: Some(PhysicalPlanType::CsvScan(
1641 protobuf::CsvScanExecNode {
1642 base_conf: Some(serialize_file_scan_config(
1643 maybe_csv,
1644 extension_codec,
1645 )?),
1646 has_header: csv_config.has_header(),
1647 delimiter: byte_to_string(
1648 csv_config.delimiter(),
1649 "delimiter",
1650 )?,
1651 quote: byte_to_string(csv_config.quote(), "quote")?,
1652 optional_escape: if let Some(escape) = csv_config.escape()
1653 {
1654 Some(
1655 protobuf::csv_scan_exec_node::OptionalEscape::Escape(
1656 byte_to_string(escape, "escape")?,
1657 ),
1658 )
1659 } else {
1660 None
1661 },
1662 optional_comment: if let Some(comment) =
1663 csv_config.comment()
1664 {
1665 Some(protobuf::csv_scan_exec_node::OptionalComment::Comment(
1666 byte_to_string(comment, "comment")?,
1667 ))
1668 } else {
1669 None
1670 },
1671 newlines_in_values: maybe_csv.newlines_in_values(),
1672 },
1673 )),
1674 });
1675 }
1676 }
1677 }
1678
1679 #[cfg(feature = "parquet")]
1680 if let Some(exec) = plan.downcast_ref::<DataSourceExec>() {
1681 let data_source_exec = exec.data_source();
1682 if let Some(maybe_parquet) =
1683 data_source_exec.as_any().downcast_ref::<FileScanConfig>()
1684 {
1685 let source = maybe_parquet.file_source();
1686 if let Some(conf) = source.as_any().downcast_ref::<ParquetSource>() {
1687 let predicate = conf
1688 .predicate()
1689 .map(|pred| serialize_physical_expr(pred, extension_codec))
1690 .transpose()?;
1691 return Ok(protobuf::PhysicalPlanNode {
1692 physical_plan_type: Some(PhysicalPlanType::ParquetScan(
1693 protobuf::ParquetScanExecNode {
1694 base_conf: Some(serialize_file_scan_config(
1695 maybe_parquet,
1696 extension_codec,
1697 )?),
1698 predicate,
1699 parquet_options: Some(
1700 conf.table_parquet_options().try_into()?,
1701 ),
1702 },
1703 )),
1704 });
1705 }
1706 }
1707 }
1708
1709 if let Some(data_source_exec) = plan.downcast_ref::<DataSourceExec>() {
1710 let data_source = data_source_exec.data_source();
1711 if let Some(maybe_avro) =
1712 data_source.as_any().downcast_ref::<FileScanConfig>()
1713 {
1714 let source = maybe_avro.file_source();
1715 if source.as_any().downcast_ref::<AvroSource>().is_some() {
1716 return Ok(protobuf::PhysicalPlanNode {
1717 physical_plan_type: Some(PhysicalPlanType::AvroScan(
1718 protobuf::AvroScanExecNode {
1719 base_conf: Some(serialize_file_scan_config(
1720 maybe_avro,
1721 extension_codec,
1722 )?),
1723 },
1724 )),
1725 });
1726 }
1727 }
1728 }
1729
1730 if let Some(exec) = plan.downcast_ref::<CoalescePartitionsExec>() {
1731 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1732 exec.input().to_owned(),
1733 extension_codec,
1734 )?;
1735 return Ok(protobuf::PhysicalPlanNode {
1736 physical_plan_type: Some(PhysicalPlanType::Merge(Box::new(
1737 protobuf::CoalescePartitionsExecNode {
1738 input: Some(Box::new(input)),
1739 },
1740 ))),
1741 });
1742 }
1743
1744 if let Some(exec) = plan.downcast_ref::<RepartitionExec>() {
1745 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1746 exec.input().to_owned(),
1747 extension_codec,
1748 )?;
1749
1750 let pb_partitioning =
1751 serialize_partitioning(exec.partitioning(), extension_codec)?;
1752
1753 return Ok(protobuf::PhysicalPlanNode {
1754 physical_plan_type: Some(PhysicalPlanType::Repartition(Box::new(
1755 protobuf::RepartitionExecNode {
1756 input: Some(Box::new(input)),
1757 partitioning: Some(pb_partitioning),
1758 },
1759 ))),
1760 });
1761 }
1762
1763 if let Some(exec) = plan.downcast_ref::<SortExec>() {
1764 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1765 exec.input().to_owned(),
1766 extension_codec,
1767 )?;
1768 let expr = exec
1769 .expr()
1770 .iter()
1771 .map(|expr| {
1772 let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
1773 expr: Some(Box::new(serialize_physical_expr(
1774 &expr.expr,
1775 extension_codec,
1776 )?)),
1777 asc: !expr.options.descending,
1778 nulls_first: expr.options.nulls_first,
1779 });
1780 Ok(protobuf::PhysicalExprNode {
1781 expr_type: Some(ExprType::Sort(sort_expr)),
1782 })
1783 })
1784 .collect::<Result<Vec<_>>>()?;
1785 return Ok(protobuf::PhysicalPlanNode {
1786 physical_plan_type: Some(PhysicalPlanType::Sort(Box::new(
1787 protobuf::SortExecNode {
1788 input: Some(Box::new(input)),
1789 expr,
1790 fetch: match exec.fetch() {
1791 Some(n) => n as i64,
1792 _ => -1,
1793 },
1794 preserve_partitioning: exec.preserve_partitioning(),
1795 },
1796 ))),
1797 });
1798 }
1799
1800 if let Some(union) = plan.downcast_ref::<UnionExec>() {
1801 let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
1802 for input in union.inputs() {
1803 inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
1804 input.to_owned(),
1805 extension_codec,
1806 )?);
1807 }
1808 return Ok(protobuf::PhysicalPlanNode {
1809 physical_plan_type: Some(PhysicalPlanType::Union(
1810 protobuf::UnionExecNode { inputs },
1811 )),
1812 });
1813 }
1814
1815 if let Some(interleave) = plan.downcast_ref::<InterleaveExec>() {
1816 let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
1817 for input in interleave.inputs() {
1818 inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
1819 input.to_owned(),
1820 extension_codec,
1821 )?);
1822 }
1823 return Ok(protobuf::PhysicalPlanNode {
1824 physical_plan_type: Some(PhysicalPlanType::Interleave(
1825 protobuf::InterleaveExecNode { inputs },
1826 )),
1827 });
1828 }
1829
1830 if let Some(exec) = plan.downcast_ref::<SortPreservingMergeExec>() {
1831 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1832 exec.input().to_owned(),
1833 extension_codec,
1834 )?;
1835 let expr = exec
1836 .expr()
1837 .iter()
1838 .map(|expr| {
1839 let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
1840 expr: Some(Box::new(serialize_physical_expr(
1841 &expr.expr,
1842 extension_codec,
1843 )?)),
1844 asc: !expr.options.descending,
1845 nulls_first: expr.options.nulls_first,
1846 });
1847 Ok(protobuf::PhysicalExprNode {
1848 expr_type: Some(ExprType::Sort(sort_expr)),
1849 })
1850 })
1851 .collect::<Result<Vec<_>>>()?;
1852 return Ok(protobuf::PhysicalPlanNode {
1853 physical_plan_type: Some(PhysicalPlanType::SortPreservingMerge(
1854 Box::new(protobuf::SortPreservingMergeExecNode {
1855 input: Some(Box::new(input)),
1856 expr,
1857 fetch: exec.fetch().map(|f| f as i64).unwrap_or(-1),
1858 }),
1859 )),
1860 });
1861 }
1862
1863 if let Some(exec) = plan.downcast_ref::<NestedLoopJoinExec>() {
1864 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
1865 exec.left().to_owned(),
1866 extension_codec,
1867 )?;
1868 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
1869 exec.right().to_owned(),
1870 extension_codec,
1871 )?;
1872
1873 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
1874 let filter = exec
1875 .filter()
1876 .as_ref()
1877 .map(|f| {
1878 let expression =
1879 serialize_physical_expr(f.expression(), extension_codec)?;
1880 let column_indices = f
1881 .column_indices()
1882 .iter()
1883 .map(|i| {
1884 let side: protobuf::JoinSide = i.side.to_owned().into();
1885 protobuf::ColumnIndex {
1886 index: i.index as u32,
1887 side: side.into(),
1888 }
1889 })
1890 .collect();
1891 let schema = f.schema().as_ref().try_into()?;
1892 Ok(protobuf::JoinFilter {
1893 expression: Some(expression),
1894 column_indices,
1895 schema: Some(schema),
1896 })
1897 })
1898 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
1899
1900 return Ok(protobuf::PhysicalPlanNode {
1901 physical_plan_type: Some(PhysicalPlanType::NestedLoopJoin(Box::new(
1902 protobuf::NestedLoopJoinExecNode {
1903 left: Some(Box::new(left)),
1904 right: Some(Box::new(right)),
1905 join_type: join_type.into(),
1906 filter,
1907 projection: exec.projection().map_or_else(Vec::new, |v| {
1908 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
1909 }),
1910 },
1911 ))),
1912 });
1913 }
1914
1915 if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
1916 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1917 exec.input().to_owned(),
1918 extension_codec,
1919 )?;
1920
1921 let window_expr = exec
1922 .window_expr()
1923 .iter()
1924 .map(|e| serialize_physical_window_expr(e, extension_codec))
1925 .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
1926
1927 let partition_keys = exec
1928 .partition_keys()
1929 .iter()
1930 .map(|e| serialize_physical_expr(e, extension_codec))
1931 .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
1932
1933 return Ok(protobuf::PhysicalPlanNode {
1934 physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
1935 protobuf::WindowAggExecNode {
1936 input: Some(Box::new(input)),
1937 window_expr,
1938 partition_keys,
1939 input_order_mode: None,
1940 },
1941 ))),
1942 });
1943 }
1944
1945 if let Some(exec) = plan.downcast_ref::<BoundedWindowAggExec>() {
1946 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1947 exec.input().to_owned(),
1948 extension_codec,
1949 )?;
1950
1951 let window_expr = exec
1952 .window_expr()
1953 .iter()
1954 .map(|e| serialize_physical_window_expr(e, extension_codec))
1955 .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
1956
1957 let partition_keys = exec
1958 .partition_keys()
1959 .iter()
1960 .map(|e| serialize_physical_expr(e, extension_codec))
1961 .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
1962
1963 let input_order_mode = match &exec.input_order_mode {
1964 InputOrderMode::Linear => window_agg_exec_node::InputOrderMode::Linear(
1965 protobuf::EmptyMessage {},
1966 ),
1967 InputOrderMode::PartiallySorted(columns) => {
1968 window_agg_exec_node::InputOrderMode::PartiallySorted(
1969 protobuf::PartiallySortedInputOrderMode {
1970 columns: columns.iter().map(|c| *c as u64).collect(),
1971 },
1972 )
1973 }
1974 InputOrderMode::Sorted => window_agg_exec_node::InputOrderMode::Sorted(
1975 protobuf::EmptyMessage {},
1976 ),
1977 };
1978
1979 return Ok(protobuf::PhysicalPlanNode {
1980 physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
1981 protobuf::WindowAggExecNode {
1982 input: Some(Box::new(input)),
1983 window_expr,
1984 partition_keys,
1985 input_order_mode: Some(input_order_mode),
1986 },
1987 ))),
1988 });
1989 }
1990
1991 if let Some(exec) = plan.downcast_ref::<DataSinkExec>() {
1992 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1993 exec.input().to_owned(),
1994 extension_codec,
1995 )?;
1996 let sort_order = match exec.sort_order() {
1997 Some(requirements) => {
1998 let expr = requirements
1999 .iter()
2000 .map(|requirement| {
2001 let expr: PhysicalSortExpr = requirement.to_owned().into();
2002 let sort_expr = protobuf::PhysicalSortExprNode {
2003 expr: Some(Box::new(serialize_physical_expr(
2004 &expr.expr,
2005 extension_codec,
2006 )?)),
2007 asc: !expr.options.descending,
2008 nulls_first: expr.options.nulls_first,
2009 };
2010 Ok(sort_expr)
2011 })
2012 .collect::<Result<Vec<_>>>()?;
2013 Some(protobuf::PhysicalSortExprNodeCollection {
2014 physical_sort_expr_nodes: expr,
2015 })
2016 }
2017 None => None,
2018 };
2019
2020 if let Some(sink) = exec.sink().as_any().downcast_ref::<JsonSink>() {
2021 return Ok(protobuf::PhysicalPlanNode {
2022 physical_plan_type: Some(PhysicalPlanType::JsonSink(Box::new(
2023 protobuf::JsonSinkExecNode {
2024 input: Some(Box::new(input)),
2025 sink: Some(sink.try_into()?),
2026 sink_schema: Some(exec.schema().as_ref().try_into()?),
2027 sort_order,
2028 },
2029 ))),
2030 });
2031 }
2032
2033 if let Some(sink) = exec.sink().as_any().downcast_ref::<CsvSink>() {
2034 return Ok(protobuf::PhysicalPlanNode {
2035 physical_plan_type: Some(PhysicalPlanType::CsvSink(Box::new(
2036 protobuf::CsvSinkExecNode {
2037 input: Some(Box::new(input)),
2038 sink: Some(sink.try_into()?),
2039 sink_schema: Some(exec.schema().as_ref().try_into()?),
2040 sort_order,
2041 },
2042 ))),
2043 });
2044 }
2045
2046 #[cfg(feature = "parquet")]
2047 if let Some(sink) = exec.sink().as_any().downcast_ref::<ParquetSink>() {
2048 return Ok(protobuf::PhysicalPlanNode {
2049 physical_plan_type: Some(PhysicalPlanType::ParquetSink(Box::new(
2050 protobuf::ParquetSinkExecNode {
2051 input: Some(Box::new(input)),
2052 sink: Some(sink.try_into()?),
2053 sink_schema: Some(exec.schema().as_ref().try_into()?),
2054 sort_order,
2055 },
2056 ))),
2057 });
2058 }
2059
2060 }
2062
2063 if let Some(exec) = plan.downcast_ref::<UnnestExec>() {
2064 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2065 exec.input().to_owned(),
2066 extension_codec,
2067 )?;
2068
2069 return Ok(protobuf::PhysicalPlanNode {
2070 physical_plan_type: Some(PhysicalPlanType::Unnest(Box::new(
2071 protobuf::UnnestExecNode {
2072 input: Some(Box::new(input)),
2073 schema: Some(exec.schema().try_into()?),
2074 list_type_columns: exec
2075 .list_column_indices()
2076 .iter()
2077 .map(|c| ProtoListUnnest {
2078 index_in_input_schema: c.index_in_input_schema as _,
2079 depth: c.depth as _,
2080 })
2081 .collect(),
2082 struct_type_columns: exec
2083 .struct_column_indices()
2084 .iter()
2085 .map(|c| *c as _)
2086 .collect(),
2087 options: Some(exec.options().into()),
2088 },
2089 ))),
2090 });
2091 }
2092
2093 let mut buf: Vec<u8> = vec![];
2094 match extension_codec.try_encode(Arc::clone(&plan_clone), &mut buf) {
2095 Ok(_) => {
2096 let inputs: Vec<protobuf::PhysicalPlanNode> = plan_clone
2097 .children()
2098 .into_iter()
2099 .cloned()
2100 .map(|i| {
2101 protobuf::PhysicalPlanNode::try_from_physical_plan(
2102 i,
2103 extension_codec,
2104 )
2105 })
2106 .collect::<Result<_>>()?;
2107
2108 Ok(protobuf::PhysicalPlanNode {
2109 physical_plan_type: Some(PhysicalPlanType::Extension(
2110 protobuf::PhysicalExtensionNode { node: buf, inputs },
2111 )),
2112 })
2113 }
2114 Err(e) => internal_err!(
2115 "Unsupported plan and extension codec failed with [{e}]. Plan: {plan_clone:?}"
2116 ),
2117 }
2118 }
2119}
2120
2121pub trait AsExecutionPlan: Debug + Send + Sync + Clone {
2122 fn try_decode(buf: &[u8]) -> Result<Self>
2123 where
2124 Self: Sized;
2125
2126 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
2127 where
2128 B: BufMut,
2129 Self: Sized;
2130
2131 fn try_into_physical_plan(
2132 &self,
2133 registry: &dyn FunctionRegistry,
2134 runtime: &RuntimeEnv,
2135 extension_codec: &dyn PhysicalExtensionCodec,
2136 ) -> Result<Arc<dyn ExecutionPlan>>;
2137
2138 fn try_from_physical_plan(
2139 plan: Arc<dyn ExecutionPlan>,
2140 extension_codec: &dyn PhysicalExtensionCodec,
2141 ) -> Result<Self>
2142 where
2143 Self: Sized;
2144}
2145
2146pub trait PhysicalExtensionCodec: Debug + Send + Sync {
2147 fn try_decode(
2148 &self,
2149 buf: &[u8],
2150 inputs: &[Arc<dyn ExecutionPlan>],
2151 registry: &dyn FunctionRegistry,
2152 ) -> Result<Arc<dyn ExecutionPlan>>;
2153
2154 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()>;
2155
2156 fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
2157 not_impl_err!("PhysicalExtensionCodec is not provided for scalar function {name}")
2158 }
2159
2160 fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
2161 Ok(())
2162 }
2163
2164 fn try_decode_expr(
2165 &self,
2166 _buf: &[u8],
2167 _inputs: &[Arc<dyn PhysicalExpr>],
2168 ) -> Result<Arc<dyn PhysicalExpr>> {
2169 not_impl_err!("PhysicalExtensionCodec is not provided")
2170 }
2171
2172 fn try_encode_expr(
2173 &self,
2174 _node: &Arc<dyn PhysicalExpr>,
2175 _buf: &mut Vec<u8>,
2176 ) -> Result<()> {
2177 not_impl_err!("PhysicalExtensionCodec is not provided")
2178 }
2179
2180 fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
2181 not_impl_err!(
2182 "PhysicalExtensionCodec is not provided for aggregate function {name}"
2183 )
2184 }
2185
2186 fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
2187 Ok(())
2188 }
2189
2190 fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
2191 not_impl_err!("PhysicalExtensionCodec is not provided for window function {name}")
2192 }
2193
2194 fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
2195 Ok(())
2196 }
2197}
2198
2199#[derive(Debug)]
2200pub struct DefaultPhysicalExtensionCodec {}
2201
2202impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec {
2203 fn try_decode(
2204 &self,
2205 _buf: &[u8],
2206 _inputs: &[Arc<dyn ExecutionPlan>],
2207 _registry: &dyn FunctionRegistry,
2208 ) -> Result<Arc<dyn ExecutionPlan>> {
2209 not_impl_err!("PhysicalExtensionCodec is not provided")
2210 }
2211
2212 fn try_encode(
2213 &self,
2214 _node: Arc<dyn ExecutionPlan>,
2215 _buf: &mut Vec<u8>,
2216 ) -> Result<()> {
2217 not_impl_err!("PhysicalExtensionCodec is not provided")
2218 }
2219}
2220
2221fn into_physical_plan(
2222 node: &Option<Box<protobuf::PhysicalPlanNode>>,
2223 registry: &dyn FunctionRegistry,
2224 runtime: &RuntimeEnv,
2225 extension_codec: &dyn PhysicalExtensionCodec,
2226) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
2227 if let Some(field) = node {
2228 field.try_into_physical_plan(registry, runtime, extension_codec)
2229 } else {
2230 Err(proto_error("Missing required field in protobuf"))
2231 }
2232}