1use std::fmt::Debug;
19use std::sync::Arc;
20
21use self::from_proto::parse_protobuf_partitioning;
22use self::to_proto::{serialize_partitioning, serialize_physical_expr};
23use crate::common::{byte_to_string, str_to_byte};
24use crate::physical_plan::from_proto::{
25 parse_physical_expr, parse_physical_sort_expr, parse_physical_sort_exprs,
26 parse_physical_window_expr, parse_protobuf_file_scan_config,
27 parse_protobuf_file_scan_schema,
28};
29use crate::physical_plan::to_proto::{
30 serialize_file_scan_config, serialize_maybe_filter, serialize_physical_aggr_expr,
31 serialize_physical_window_expr,
32};
33use crate::protobuf::physical_aggregate_expr_node::AggregateFunction;
34use crate::protobuf::physical_expr_node::ExprType;
35use crate::protobuf::physical_plan_node::PhysicalPlanType;
36use crate::protobuf::{
37 self, proto_error, window_agg_exec_node, ListUnnest as ProtoListUnnest,
38};
39use crate::{convert_required, into_required};
40
41use datafusion::arrow::compute::SortOptions;
42use datafusion::arrow::datatypes::{Schema, SchemaRef};
43use datafusion::datasource::file_format::csv::CsvSink;
44use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
45use datafusion::datasource::file_format::json::JsonSink;
46#[cfg(feature = "parquet")]
47use datafusion::datasource::file_format::parquet::ParquetSink;
48#[cfg(feature = "avro")]
49use datafusion::datasource::physical_plan::AvroSource;
50#[cfg(feature = "parquet")]
51use datafusion::datasource::physical_plan::ParquetSource;
52use datafusion::datasource::physical_plan::{
53 CsvSource, FileScanConfig, FileScanConfigBuilder, JsonSource,
54};
55use datafusion::datasource::sink::DataSinkExec;
56use datafusion::datasource::source::DataSourceExec;
57use datafusion::execution::runtime_env::RuntimeEnv;
58use datafusion::execution::FunctionRegistry;
59use datafusion::physical_expr::aggregate::AggregateExprBuilder;
60use datafusion::physical_expr::aggregate::AggregateFunctionExpr;
61use datafusion::physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef};
62use datafusion::physical_plan::aggregates::AggregateMode;
63use datafusion::physical_plan::aggregates::{AggregateExec, PhysicalGroupBy};
64use datafusion::physical_plan::analyze::AnalyzeExec;
65use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
66use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
67use datafusion::physical_plan::coop::CooperativeExec;
68use datafusion::physical_plan::empty::EmptyExec;
69use datafusion::physical_plan::explain::ExplainExec;
70use datafusion::physical_plan::expressions::PhysicalSortExpr;
71use datafusion::physical_plan::filter::FilterExec;
72use datafusion::physical_plan::joins::utils::{ColumnIndex, JoinFilter};
73use datafusion::physical_plan::joins::{
74 CrossJoinExec, NestedLoopJoinExec, StreamJoinPartitionMode, SymmetricHashJoinExec,
75};
76use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
77use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
78use datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
79use datafusion::physical_plan::projection::ProjectionExec;
80use datafusion::physical_plan::repartition::RepartitionExec;
81use datafusion::physical_plan::sorts::sort::SortExec;
82use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
83use datafusion::physical_plan::union::{InterleaveExec, UnionExec};
84use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec};
85use datafusion::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
86use datafusion::physical_plan::{
87 ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr,
88};
89use datafusion_common::config::TableParquetOptions;
90use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
91use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
92
93use prost::bytes::BufMut;
94use prost::Message;
95
96pub mod from_proto;
97pub mod to_proto;
98
99impl AsExecutionPlan for protobuf::PhysicalPlanNode {
100 fn try_decode(buf: &[u8]) -> Result<Self>
101 where
102 Self: Sized,
103 {
104 protobuf::PhysicalPlanNode::decode(buf).map_err(|e| {
105 DataFusionError::Internal(format!("failed to decode physical plan: {e:?}"))
106 })
107 }
108
109 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
110 where
111 B: BufMut,
112 Self: Sized,
113 {
114 self.encode(buf).map_err(|e| {
115 DataFusionError::Internal(format!("failed to encode physical plan: {e:?}"))
116 })
117 }
118
119 fn try_into_physical_plan(
120 &self,
121 registry: &dyn FunctionRegistry,
122 runtime: &RuntimeEnv,
123 extension_codec: &dyn PhysicalExtensionCodec,
124 ) -> Result<Arc<dyn ExecutionPlan>> {
125 let plan = self.physical_plan_type.as_ref().ok_or_else(|| {
126 proto_error(format!(
127 "physical_plan::from_proto() Unsupported physical plan '{self:?}'"
128 ))
129 })?;
130 match plan {
131 PhysicalPlanType::Explain(explain) => self.try_into_explain_physical_plan(
132 explain,
133 registry,
134 runtime,
135 extension_codec,
136 ),
137 PhysicalPlanType::Projection(projection) => self
138 .try_into_projection_physical_plan(
139 projection,
140 registry,
141 runtime,
142 extension_codec,
143 ),
144 PhysicalPlanType::Filter(filter) => self.try_into_filter_physical_plan(
145 filter,
146 registry,
147 runtime,
148 extension_codec,
149 ),
150 PhysicalPlanType::CsvScan(scan) => self.try_into_csv_scan_physical_plan(
151 scan,
152 registry,
153 runtime,
154 extension_codec,
155 ),
156 PhysicalPlanType::JsonScan(scan) => self.try_into_json_scan_physical_plan(
157 scan,
158 registry,
159 runtime,
160 extension_codec,
161 ),
162 #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
163 PhysicalPlanType::ParquetScan(scan) => self
164 .try_into_parquet_scan_physical_plan(
165 scan,
166 registry,
167 runtime,
168 extension_codec,
169 ),
170 #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
171 PhysicalPlanType::AvroScan(scan) => self.try_into_avro_scan_physical_plan(
172 scan,
173 registry,
174 runtime,
175 extension_codec,
176 ),
177 PhysicalPlanType::CoalesceBatches(coalesce_batches) => self
178 .try_into_coalesce_batches_physical_plan(
179 coalesce_batches,
180 registry,
181 runtime,
182 extension_codec,
183 ),
184 PhysicalPlanType::Merge(merge) => self.try_into_merge_physical_plan(
185 merge,
186 registry,
187 runtime,
188 extension_codec,
189 ),
190 PhysicalPlanType::Repartition(repart) => self
191 .try_into_repartition_physical_plan(
192 repart,
193 registry,
194 runtime,
195 extension_codec,
196 ),
197 PhysicalPlanType::GlobalLimit(limit) => self
198 .try_into_global_limit_physical_plan(
199 limit,
200 registry,
201 runtime,
202 extension_codec,
203 ),
204 PhysicalPlanType::LocalLimit(limit) => self
205 .try_into_local_limit_physical_plan(
206 limit,
207 registry,
208 runtime,
209 extension_codec,
210 ),
211 PhysicalPlanType::Window(window_agg) => self.try_into_window_physical_plan(
212 window_agg,
213 registry,
214 runtime,
215 extension_codec,
216 ),
217 PhysicalPlanType::Aggregate(hash_agg) => self
218 .try_into_aggregate_physical_plan(
219 hash_agg,
220 registry,
221 runtime,
222 extension_codec,
223 ),
224 PhysicalPlanType::HashJoin(hashjoin) => self
225 .try_into_hash_join_physical_plan(
226 hashjoin,
227 registry,
228 runtime,
229 extension_codec,
230 ),
231 PhysicalPlanType::SymmetricHashJoin(sym_join) => self
232 .try_into_symmetric_hash_join_physical_plan(
233 sym_join,
234 registry,
235 runtime,
236 extension_codec,
237 ),
238 PhysicalPlanType::Union(union) => self.try_into_union_physical_plan(
239 union,
240 registry,
241 runtime,
242 extension_codec,
243 ),
244 PhysicalPlanType::Interleave(interleave) => self
245 .try_into_interleave_physical_plan(
246 interleave,
247 registry,
248 runtime,
249 extension_codec,
250 ),
251 PhysicalPlanType::CrossJoin(crossjoin) => self
252 .try_into_cross_join_physical_plan(
253 crossjoin,
254 registry,
255 runtime,
256 extension_codec,
257 ),
258 PhysicalPlanType::Empty(empty) => self.try_into_empty_physical_plan(
259 empty,
260 registry,
261 runtime,
262 extension_codec,
263 ),
264 PhysicalPlanType::PlaceholderRow(placeholder) => self
265 .try_into_placeholder_row_physical_plan(
266 placeholder,
267 registry,
268 runtime,
269 extension_codec,
270 ),
271 PhysicalPlanType::Sort(sort) => {
272 self.try_into_sort_physical_plan(sort, registry, runtime, extension_codec)
273 }
274 PhysicalPlanType::SortPreservingMerge(sort) => self
275 .try_into_sort_preserving_merge_physical_plan(
276 sort,
277 registry,
278 runtime,
279 extension_codec,
280 ),
281 PhysicalPlanType::Extension(extension) => self
282 .try_into_extension_physical_plan(
283 extension,
284 registry,
285 runtime,
286 extension_codec,
287 ),
288 PhysicalPlanType::NestedLoopJoin(join) => self
289 .try_into_nested_loop_join_physical_plan(
290 join,
291 registry,
292 runtime,
293 extension_codec,
294 ),
295 PhysicalPlanType::Analyze(analyze) => self.try_into_analyze_physical_plan(
296 analyze,
297 registry,
298 runtime,
299 extension_codec,
300 ),
301 PhysicalPlanType::JsonSink(sink) => self.try_into_json_sink_physical_plan(
302 sink,
303 registry,
304 runtime,
305 extension_codec,
306 ),
307 PhysicalPlanType::CsvSink(sink) => self.try_into_csv_sink_physical_plan(
308 sink,
309 registry,
310 runtime,
311 extension_codec,
312 ),
313 #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
314 PhysicalPlanType::ParquetSink(sink) => self
315 .try_into_parquet_sink_physical_plan(
316 sink,
317 registry,
318 runtime,
319 extension_codec,
320 ),
321 PhysicalPlanType::Unnest(unnest) => self.try_into_unnest_physical_plan(
322 unnest,
323 registry,
324 runtime,
325 extension_codec,
326 ),
327 PhysicalPlanType::Cooperative(cooperative) => self
328 .try_into_cooperative_physical_plan(
329 cooperative,
330 registry,
331 runtime,
332 extension_codec,
333 ),
334 }
335 }
336
337 fn try_from_physical_plan(
338 plan: Arc<dyn ExecutionPlan>,
339 extension_codec: &dyn PhysicalExtensionCodec,
340 ) -> Result<Self>
341 where
342 Self: Sized,
343 {
344 let plan_clone = Arc::clone(&plan);
345 let plan = plan.as_any();
346
347 if let Some(exec) = plan.downcast_ref::<ExplainExec>() {
348 return protobuf::PhysicalPlanNode::try_from_explain_exec(
349 exec,
350 extension_codec,
351 );
352 }
353
354 if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
355 return protobuf::PhysicalPlanNode::try_from_projection_exec(
356 exec,
357 extension_codec,
358 );
359 }
360
361 if let Some(exec) = plan.downcast_ref::<AnalyzeExec>() {
362 return protobuf::PhysicalPlanNode::try_from_analyze_exec(
363 exec,
364 extension_codec,
365 );
366 }
367
368 if let Some(exec) = plan.downcast_ref::<FilterExec>() {
369 return protobuf::PhysicalPlanNode::try_from_filter_exec(
370 exec,
371 extension_codec,
372 );
373 }
374
375 if let Some(limit) = plan.downcast_ref::<GlobalLimitExec>() {
376 return protobuf::PhysicalPlanNode::try_from_global_limit_exec(
377 limit,
378 extension_codec,
379 );
380 }
381
382 if let Some(limit) = plan.downcast_ref::<LocalLimitExec>() {
383 return protobuf::PhysicalPlanNode::try_from_local_limit_exec(
384 limit,
385 extension_codec,
386 );
387 }
388
389 if let Some(exec) = plan.downcast_ref::<HashJoinExec>() {
390 return protobuf::PhysicalPlanNode::try_from_hash_join_exec(
391 exec,
392 extension_codec,
393 );
394 }
395
396 if let Some(exec) = plan.downcast_ref::<SymmetricHashJoinExec>() {
397 return protobuf::PhysicalPlanNode::try_from_symmetric_hash_join_exec(
398 exec,
399 extension_codec,
400 );
401 }
402
403 if let Some(exec) = plan.downcast_ref::<CrossJoinExec>() {
404 return protobuf::PhysicalPlanNode::try_from_cross_join_exec(
405 exec,
406 extension_codec,
407 );
408 }
409
410 if let Some(exec) = plan.downcast_ref::<AggregateExec>() {
411 return protobuf::PhysicalPlanNode::try_from_aggregate_exec(
412 exec,
413 extension_codec,
414 );
415 }
416
417 if let Some(empty) = plan.downcast_ref::<EmptyExec>() {
418 return protobuf::PhysicalPlanNode::try_from_empty_exec(
419 empty,
420 extension_codec,
421 );
422 }
423
424 if let Some(empty) = plan.downcast_ref::<PlaceholderRowExec>() {
425 return protobuf::PhysicalPlanNode::try_from_placeholder_row_exec(
426 empty,
427 extension_codec,
428 );
429 }
430
431 if let Some(coalesce_batches) = plan.downcast_ref::<CoalesceBatchesExec>() {
432 return protobuf::PhysicalPlanNode::try_from_coalesce_batches_exec(
433 coalesce_batches,
434 extension_codec,
435 );
436 }
437
438 if let Some(data_source_exec) = plan.downcast_ref::<DataSourceExec>() {
439 if let Some(node) = protobuf::PhysicalPlanNode::try_from_data_source_exec(
440 data_source_exec,
441 extension_codec,
442 )? {
443 return Ok(node);
444 }
445 }
446
447 if let Some(exec) = plan.downcast_ref::<CoalescePartitionsExec>() {
448 return protobuf::PhysicalPlanNode::try_from_coalesce_partitions_exec(
449 exec,
450 extension_codec,
451 );
452 }
453
454 if let Some(exec) = plan.downcast_ref::<RepartitionExec>() {
455 return protobuf::PhysicalPlanNode::try_from_repartition_exec(
456 exec,
457 extension_codec,
458 );
459 }
460
461 if let Some(exec) = plan.downcast_ref::<SortExec>() {
462 return protobuf::PhysicalPlanNode::try_from_sort_exec(exec, extension_codec);
463 }
464
465 if let Some(union) = plan.downcast_ref::<UnionExec>() {
466 return protobuf::PhysicalPlanNode::try_from_union_exec(
467 union,
468 extension_codec,
469 );
470 }
471
472 if let Some(interleave) = plan.downcast_ref::<InterleaveExec>() {
473 return protobuf::PhysicalPlanNode::try_from_interleave_exec(
474 interleave,
475 extension_codec,
476 );
477 }
478
479 if let Some(exec) = plan.downcast_ref::<SortPreservingMergeExec>() {
480 return protobuf::PhysicalPlanNode::try_from_sort_preserving_merge_exec(
481 exec,
482 extension_codec,
483 );
484 }
485
486 if let Some(exec) = plan.downcast_ref::<NestedLoopJoinExec>() {
487 return protobuf::PhysicalPlanNode::try_from_nested_loop_join_exec(
488 exec,
489 extension_codec,
490 );
491 }
492
493 if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
494 return protobuf::PhysicalPlanNode::try_from_window_agg_exec(
495 exec,
496 extension_codec,
497 );
498 }
499
500 if let Some(exec) = plan.downcast_ref::<BoundedWindowAggExec>() {
501 return protobuf::PhysicalPlanNode::try_from_bounded_window_agg_exec(
502 exec,
503 extension_codec,
504 );
505 }
506
507 if let Some(exec) = plan.downcast_ref::<DataSinkExec>() {
508 if let Some(node) = protobuf::PhysicalPlanNode::try_from_data_sink_exec(
509 exec,
510 extension_codec,
511 )? {
512 return Ok(node);
513 }
514 }
515
516 if let Some(exec) = plan.downcast_ref::<UnnestExec>() {
517 return protobuf::PhysicalPlanNode::try_from_unnest_exec(
518 exec,
519 extension_codec,
520 );
521 }
522
523 if let Some(exec) = plan.downcast_ref::<CooperativeExec>() {
524 return protobuf::PhysicalPlanNode::try_from_cooperative_exec(
525 exec,
526 extension_codec,
527 );
528 }
529
530 let mut buf: Vec<u8> = vec![];
531 match extension_codec.try_encode(Arc::clone(&plan_clone), &mut buf) {
532 Ok(_) => {
533 let inputs: Vec<protobuf::PhysicalPlanNode> = plan_clone
534 .children()
535 .into_iter()
536 .cloned()
537 .map(|i| {
538 protobuf::PhysicalPlanNode::try_from_physical_plan(
539 i,
540 extension_codec,
541 )
542 })
543 .collect::<Result<_>>()?;
544
545 Ok(protobuf::PhysicalPlanNode {
546 physical_plan_type: Some(PhysicalPlanType::Extension(
547 protobuf::PhysicalExtensionNode { node: buf, inputs },
548 )),
549 })
550 }
551 Err(e) => internal_err!(
552 "Unsupported plan and extension codec failed with [{e}]. Plan: {plan_clone:?}"
553 ),
554 }
555 }
556}
557
558impl protobuf::PhysicalPlanNode {
559 fn try_into_explain_physical_plan(
560 &self,
561 explain: &protobuf::ExplainExecNode,
562 _registry: &dyn FunctionRegistry,
563 _runtime: &RuntimeEnv,
564 _extension_codec: &dyn PhysicalExtensionCodec,
565 ) -> Result<Arc<dyn ExecutionPlan>> {
566 Ok(Arc::new(ExplainExec::new(
567 Arc::new(explain.schema.as_ref().unwrap().try_into()?),
568 explain
569 .stringified_plans
570 .iter()
571 .map(|plan| plan.into())
572 .collect(),
573 explain.verbose,
574 )))
575 }
576
577 fn try_into_projection_physical_plan(
578 &self,
579 projection: &protobuf::ProjectionExecNode,
580 registry: &dyn FunctionRegistry,
581 runtime: &RuntimeEnv,
582 extension_codec: &dyn PhysicalExtensionCodec,
583 ) -> Result<Arc<dyn ExecutionPlan>> {
584 let input: Arc<dyn ExecutionPlan> =
585 into_physical_plan(&projection.input, registry, runtime, extension_codec)?;
586 let exprs = projection
587 .expr
588 .iter()
589 .zip(projection.expr_name.iter())
590 .map(|(expr, name)| {
591 Ok((
592 parse_physical_expr(
593 expr,
594 registry,
595 input.schema().as_ref(),
596 extension_codec,
597 )?,
598 name.to_string(),
599 ))
600 })
601 .collect::<Result<Vec<(Arc<dyn PhysicalExpr>, String)>>>()?;
602 Ok(Arc::new(ProjectionExec::try_new(exprs, input)?))
603 }
604
605 fn try_into_filter_physical_plan(
606 &self,
607 filter: &protobuf::FilterExecNode,
608 registry: &dyn FunctionRegistry,
609 runtime: &RuntimeEnv,
610 extension_codec: &dyn PhysicalExtensionCodec,
611 ) -> Result<Arc<dyn ExecutionPlan>> {
612 let input: Arc<dyn ExecutionPlan> =
613 into_physical_plan(&filter.input, registry, runtime, extension_codec)?;
614 let projection = if !filter.projection.is_empty() {
615 Some(
616 filter
617 .projection
618 .iter()
619 .map(|i| *i as usize)
620 .collect::<Vec<_>>(),
621 )
622 } else {
623 None
624 };
625
626 let predicate_schema = if let Some(ref proj_indices) = projection {
628 let projected_fields: Vec<_> = proj_indices
630 .iter()
631 .map(|&i| input.schema().field(i).clone())
632 .collect();
633 Arc::new(Schema::new(projected_fields))
634 } else {
635 input.schema()
636 };
637
638 let predicate = filter
639 .expr
640 .as_ref()
641 .map(|expr| {
642 parse_physical_expr(
643 expr,
644 registry,
645 predicate_schema.as_ref(),
646 extension_codec,
647 )
648 })
649 .transpose()?
650 .ok_or_else(|| {
651 DataFusionError::Internal(
652 "filter (FilterExecNode) in PhysicalPlanNode is missing.".to_owned(),
653 )
654 })?;
655 let filter_selectivity = filter.default_filter_selectivity.try_into();
656 let filter =
657 FilterExec::try_new(predicate, input)?.with_projection(projection)?;
658 match filter_selectivity {
659 Ok(filter_selectivity) => Ok(Arc::new(
660 filter.with_default_selectivity(filter_selectivity)?,
661 )),
662 Err(_) => Err(DataFusionError::Internal(
663 "filter_selectivity in PhysicalPlanNode is invalid ".to_owned(),
664 )),
665 }
666 }
667
668 fn try_into_csv_scan_physical_plan(
669 &self,
670 scan: &protobuf::CsvScanExecNode,
671 registry: &dyn FunctionRegistry,
672 _runtime: &RuntimeEnv,
673 extension_codec: &dyn PhysicalExtensionCodec,
674 ) -> Result<Arc<dyn ExecutionPlan>> {
675 let escape =
676 if let Some(protobuf::csv_scan_exec_node::OptionalEscape::Escape(escape)) =
677 &scan.optional_escape
678 {
679 Some(str_to_byte(escape, "escape")?)
680 } else {
681 None
682 };
683
684 let comment = if let Some(
685 protobuf::csv_scan_exec_node::OptionalComment::Comment(comment),
686 ) = &scan.optional_comment
687 {
688 Some(str_to_byte(comment, "comment")?)
689 } else {
690 None
691 };
692
693 let source = Arc::new(
694 CsvSource::new(
695 scan.has_header,
696 str_to_byte(&scan.delimiter, "delimiter")?,
697 0,
698 )
699 .with_escape(escape)
700 .with_comment(comment),
701 );
702
703 let conf = FileScanConfigBuilder::from(parse_protobuf_file_scan_config(
704 scan.base_conf.as_ref().unwrap(),
705 registry,
706 extension_codec,
707 source,
708 )?)
709 .with_newlines_in_values(scan.newlines_in_values)
710 .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
711 .build();
712 Ok(DataSourceExec::from_data_source(conf))
713 }
714
715 fn try_into_json_scan_physical_plan(
716 &self,
717 scan: &protobuf::JsonScanExecNode,
718 registry: &dyn FunctionRegistry,
719 _runtime: &RuntimeEnv,
720 extension_codec: &dyn PhysicalExtensionCodec,
721 ) -> Result<Arc<dyn ExecutionPlan>> {
722 let scan_conf = parse_protobuf_file_scan_config(
723 scan.base_conf.as_ref().unwrap(),
724 registry,
725 extension_codec,
726 Arc::new(JsonSource::new()),
727 )?;
728 Ok(DataSourceExec::from_data_source(scan_conf))
729 }
730
731 #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
732 fn try_into_parquet_scan_physical_plan(
733 &self,
734 scan: &protobuf::ParquetScanExecNode,
735 registry: &dyn FunctionRegistry,
736 _runtime: &RuntimeEnv,
737 extension_codec: &dyn PhysicalExtensionCodec,
738 ) -> Result<Arc<dyn ExecutionPlan>> {
739 #[cfg(feature = "parquet")]
740 {
741 let schema =
742 parse_protobuf_file_scan_schema(scan.base_conf.as_ref().unwrap())?;
743
744 let base_conf = scan.base_conf.as_ref().unwrap();
746 let predicate_schema = if !base_conf.projection.is_empty() {
747 let projected_fields: Vec<_> = base_conf
749 .projection
750 .iter()
751 .map(|&i| schema.field(i as usize).clone())
752 .collect();
753 Arc::new(Schema::new(projected_fields))
754 } else {
755 schema
756 };
757
758 let predicate = scan
759 .predicate
760 .as_ref()
761 .map(|expr| {
762 parse_physical_expr(
763 expr,
764 registry,
765 predicate_schema.as_ref(),
766 extension_codec,
767 )
768 })
769 .transpose()?;
770 let mut options = TableParquetOptions::default();
771
772 if let Some(table_options) = scan.parquet_options.as_ref() {
773 options = table_options.try_into()?;
774 }
775 let mut source = ParquetSource::new(options);
776
777 if let Some(predicate) = predicate {
778 source = source.with_predicate(predicate);
779 }
780 let base_config = parse_protobuf_file_scan_config(
781 base_conf,
782 registry,
783 extension_codec,
784 Arc::new(source),
785 )?;
786 Ok(DataSourceExec::from_data_source(base_config))
787 }
788 #[cfg(not(feature = "parquet"))]
789 panic!("Unable to process a Parquet PhysicalPlan when `parquet` feature is not enabled")
790 }
791
792 #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
793 fn try_into_avro_scan_physical_plan(
794 &self,
795 scan: &protobuf::AvroScanExecNode,
796 registry: &dyn FunctionRegistry,
797 _runtime: &RuntimeEnv,
798 extension_codec: &dyn PhysicalExtensionCodec,
799 ) -> Result<Arc<dyn ExecutionPlan>> {
800 #[cfg(feature = "avro")]
801 {
802 let conf = parse_protobuf_file_scan_config(
803 scan.base_conf.as_ref().unwrap(),
804 registry,
805 extension_codec,
806 Arc::new(AvroSource::new()),
807 )?;
808 Ok(DataSourceExec::from_data_source(conf))
809 }
810 #[cfg(not(feature = "avro"))]
811 panic!("Unable to process a Avro PhysicalPlan when `avro` feature is not enabled")
812 }
813
814 fn try_into_coalesce_batches_physical_plan(
815 &self,
816 coalesce_batches: &protobuf::CoalesceBatchesExecNode,
817 registry: &dyn FunctionRegistry,
818 runtime: &RuntimeEnv,
819 extension_codec: &dyn PhysicalExtensionCodec,
820 ) -> Result<Arc<dyn ExecutionPlan>> {
821 let input: Arc<dyn ExecutionPlan> = into_physical_plan(
822 &coalesce_batches.input,
823 registry,
824 runtime,
825 extension_codec,
826 )?;
827 Ok(Arc::new(
828 CoalesceBatchesExec::new(input, coalesce_batches.target_batch_size as usize)
829 .with_fetch(coalesce_batches.fetch.map(|f| f as usize)),
830 ))
831 }
832
833 fn try_into_merge_physical_plan(
834 &self,
835 merge: &protobuf::CoalescePartitionsExecNode,
836 registry: &dyn FunctionRegistry,
837 runtime: &RuntimeEnv,
838 extension_codec: &dyn PhysicalExtensionCodec,
839 ) -> Result<Arc<dyn ExecutionPlan>> {
840 let input: Arc<dyn ExecutionPlan> =
841 into_physical_plan(&merge.input, registry, runtime, extension_codec)?;
842 Ok(Arc::new(
843 CoalescePartitionsExec::new(input)
844 .with_fetch(merge.fetch.map(|f| f as usize)),
845 ))
846 }
847
848 fn try_into_repartition_physical_plan(
849 &self,
850 repart: &protobuf::RepartitionExecNode,
851 registry: &dyn FunctionRegistry,
852 runtime: &RuntimeEnv,
853 extension_codec: &dyn PhysicalExtensionCodec,
854 ) -> Result<Arc<dyn ExecutionPlan>> {
855 let input: Arc<dyn ExecutionPlan> =
856 into_physical_plan(&repart.input, registry, runtime, extension_codec)?;
857 let partitioning = parse_protobuf_partitioning(
858 repart.partitioning.as_ref(),
859 registry,
860 input.schema().as_ref(),
861 extension_codec,
862 )?;
863 Ok(Arc::new(RepartitionExec::try_new(
864 input,
865 partitioning.unwrap(),
866 )?))
867 }
868
869 fn try_into_global_limit_physical_plan(
870 &self,
871 limit: &protobuf::GlobalLimitExecNode,
872 registry: &dyn FunctionRegistry,
873 runtime: &RuntimeEnv,
874 extension_codec: &dyn PhysicalExtensionCodec,
875 ) -> Result<Arc<dyn ExecutionPlan>> {
876 let input: Arc<dyn ExecutionPlan> =
877 into_physical_plan(&limit.input, registry, runtime, extension_codec)?;
878 let fetch = if limit.fetch >= 0 {
879 Some(limit.fetch as usize)
880 } else {
881 None
882 };
883 Ok(Arc::new(GlobalLimitExec::new(
884 input,
885 limit.skip as usize,
886 fetch,
887 )))
888 }
889
890 fn try_into_local_limit_physical_plan(
891 &self,
892 limit: &protobuf::LocalLimitExecNode,
893 registry: &dyn FunctionRegistry,
894 runtime: &RuntimeEnv,
895 extension_codec: &dyn PhysicalExtensionCodec,
896 ) -> Result<Arc<dyn ExecutionPlan>> {
897 let input: Arc<dyn ExecutionPlan> =
898 into_physical_plan(&limit.input, registry, runtime, extension_codec)?;
899 Ok(Arc::new(LocalLimitExec::new(input, limit.fetch as usize)))
900 }
901
902 fn try_into_window_physical_plan(
903 &self,
904 window_agg: &protobuf::WindowAggExecNode,
905 registry: &dyn FunctionRegistry,
906 runtime: &RuntimeEnv,
907 extension_codec: &dyn PhysicalExtensionCodec,
908 ) -> Result<Arc<dyn ExecutionPlan>> {
909 let input: Arc<dyn ExecutionPlan> =
910 into_physical_plan(&window_agg.input, registry, runtime, extension_codec)?;
911 let input_schema = input.schema();
912
913 let physical_window_expr: Vec<Arc<dyn WindowExpr>> = window_agg
914 .window_expr
915 .iter()
916 .map(|window_expr| {
917 parse_physical_window_expr(
918 window_expr,
919 registry,
920 input_schema.as_ref(),
921 extension_codec,
922 )
923 })
924 .collect::<Result<Vec<_>, _>>()?;
925
926 let partition_keys = window_agg
927 .partition_keys
928 .iter()
929 .map(|expr| {
930 parse_physical_expr(
931 expr,
932 registry,
933 input.schema().as_ref(),
934 extension_codec,
935 )
936 })
937 .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
938
939 if let Some(input_order_mode) = window_agg.input_order_mode.as_ref() {
940 let input_order_mode = match input_order_mode {
941 window_agg_exec_node::InputOrderMode::Linear(_) => InputOrderMode::Linear,
942 window_agg_exec_node::InputOrderMode::PartiallySorted(
943 protobuf::PartiallySortedInputOrderMode { columns },
944 ) => InputOrderMode::PartiallySorted(
945 columns.iter().map(|c| *c as usize).collect(),
946 ),
947 window_agg_exec_node::InputOrderMode::Sorted(_) => InputOrderMode::Sorted,
948 };
949
950 Ok(Arc::new(BoundedWindowAggExec::try_new(
951 physical_window_expr,
952 input,
953 input_order_mode,
954 !partition_keys.is_empty(),
955 )?))
956 } else {
957 Ok(Arc::new(WindowAggExec::try_new(
958 physical_window_expr,
959 input,
960 !partition_keys.is_empty(),
961 )?))
962 }
963 }
964
965 fn try_into_aggregate_physical_plan(
966 &self,
967 hash_agg: &protobuf::AggregateExecNode,
968 registry: &dyn FunctionRegistry,
969 runtime: &RuntimeEnv,
970 extension_codec: &dyn PhysicalExtensionCodec,
971 ) -> Result<Arc<dyn ExecutionPlan>> {
972 let input: Arc<dyn ExecutionPlan> =
973 into_physical_plan(&hash_agg.input, registry, runtime, extension_codec)?;
974 let mode = protobuf::AggregateMode::try_from(hash_agg.mode).map_err(|_| {
975 proto_error(format!(
976 "Received a AggregateNode message with unknown AggregateMode {}",
977 hash_agg.mode
978 ))
979 })?;
980 let agg_mode: AggregateMode = match mode {
981 protobuf::AggregateMode::Partial => AggregateMode::Partial,
982 protobuf::AggregateMode::Final => AggregateMode::Final,
983 protobuf::AggregateMode::FinalPartitioned => AggregateMode::FinalPartitioned,
984 protobuf::AggregateMode::Single => AggregateMode::Single,
985 protobuf::AggregateMode::SinglePartitioned => {
986 AggregateMode::SinglePartitioned
987 }
988 };
989
990 let num_expr = hash_agg.group_expr.len();
991
992 let group_expr = hash_agg
993 .group_expr
994 .iter()
995 .zip(hash_agg.group_expr_name.iter())
996 .map(|(expr, name)| {
997 parse_physical_expr(
998 expr,
999 registry,
1000 input.schema().as_ref(),
1001 extension_codec,
1002 )
1003 .map(|expr| (expr, name.to_string()))
1004 })
1005 .collect::<Result<Vec<_>, _>>()?;
1006
1007 let null_expr = hash_agg
1008 .null_expr
1009 .iter()
1010 .zip(hash_agg.group_expr_name.iter())
1011 .map(|(expr, name)| {
1012 parse_physical_expr(
1013 expr,
1014 registry,
1015 input.schema().as_ref(),
1016 extension_codec,
1017 )
1018 .map(|expr| (expr, name.to_string()))
1019 })
1020 .collect::<Result<Vec<_>, _>>()?;
1021
1022 let groups: Vec<Vec<bool>> = if !hash_agg.groups.is_empty() {
1023 hash_agg
1024 .groups
1025 .chunks(num_expr)
1026 .map(|g| g.to_vec())
1027 .collect::<Vec<Vec<bool>>>()
1028 } else {
1029 vec![]
1030 };
1031
1032 let input_schema = hash_agg.input_schema.as_ref().ok_or_else(|| {
1033 DataFusionError::Internal(
1034 "input_schema in AggregateNode is missing.".to_owned(),
1035 )
1036 })?;
1037 let physical_schema: SchemaRef = SchemaRef::new(input_schema.try_into()?);
1038
1039 let physical_filter_expr = hash_agg
1040 .filter_expr
1041 .iter()
1042 .map(|expr| {
1043 expr.expr
1044 .as_ref()
1045 .map(|e| {
1046 parse_physical_expr(
1047 e,
1048 registry,
1049 &physical_schema,
1050 extension_codec,
1051 )
1052 })
1053 .transpose()
1054 })
1055 .collect::<Result<Vec<_>, _>>()?;
1056
1057 let physical_aggr_expr: Vec<Arc<AggregateFunctionExpr>> = hash_agg
1058 .aggr_expr
1059 .iter()
1060 .zip(hash_agg.aggr_expr_name.iter())
1061 .map(|(expr, name)| {
1062 let expr_type = expr.expr_type.as_ref().ok_or_else(|| {
1063 proto_error("Unexpected empty aggregate physical expression")
1064 })?;
1065
1066 match expr_type {
1067 ExprType::AggregateExpr(agg_node) => {
1068 let input_phy_expr: Vec<Arc<dyn PhysicalExpr>> = agg_node
1069 .expr
1070 .iter()
1071 .map(|e| {
1072 parse_physical_expr(
1073 e,
1074 registry,
1075 &physical_schema,
1076 extension_codec,
1077 )
1078 })
1079 .collect::<Result<Vec<_>>>()?;
1080 let order_bys = agg_node
1081 .ordering_req
1082 .iter()
1083 .map(|e| {
1084 parse_physical_sort_expr(
1085 e,
1086 registry,
1087 &physical_schema,
1088 extension_codec,
1089 )
1090 })
1091 .collect::<Result<_>>()?;
1092 agg_node
1093 .aggregate_function
1094 .as_ref()
1095 .map(|func| match func {
1096 AggregateFunction::UserDefinedAggrFunction(udaf_name) => {
1097 let agg_udf = match &agg_node.fun_definition {
1098 Some(buf) => extension_codec
1099 .try_decode_udaf(udaf_name, buf)?,
1100 None => {
1101 registry.udaf(udaf_name).or_else(|_| {
1102 extension_codec
1103 .try_decode_udaf(udaf_name, &[])
1104 })?
1105 }
1106 };
1107
1108 AggregateExprBuilder::new(agg_udf, input_phy_expr)
1109 .schema(Arc::clone(&physical_schema))
1110 .alias(name)
1111 .with_ignore_nulls(agg_node.ignore_nulls)
1112 .with_distinct(agg_node.distinct)
1113 .order_by(order_bys)
1114 .build()
1115 .map(Arc::new)
1116 }
1117 })
1118 .transpose()?
1119 .ok_or_else(|| {
1120 proto_error(
1121 "Invalid AggregateExpr, missing aggregate_function",
1122 )
1123 })
1124 }
1125 _ => internal_err!("Invalid aggregate expression for AggregateExec"),
1126 }
1127 })
1128 .collect::<Result<Vec<_>, _>>()?;
1129
1130 let limit = hash_agg
1131 .limit
1132 .as_ref()
1133 .map(|lit_value| lit_value.limit as usize);
1134
1135 let agg = AggregateExec::try_new(
1136 agg_mode,
1137 PhysicalGroupBy::new(group_expr, null_expr, groups),
1138 physical_aggr_expr,
1139 physical_filter_expr,
1140 input,
1141 physical_schema,
1142 )?;
1143
1144 let agg = agg.with_limit(limit);
1145
1146 Ok(Arc::new(agg))
1147 }
1148
1149 fn try_into_hash_join_physical_plan(
1150 &self,
1151 hashjoin: &protobuf::HashJoinExecNode,
1152 registry: &dyn FunctionRegistry,
1153 runtime: &RuntimeEnv,
1154 extension_codec: &dyn PhysicalExtensionCodec,
1155 ) -> Result<Arc<dyn ExecutionPlan>> {
1156 let left: Arc<dyn ExecutionPlan> =
1157 into_physical_plan(&hashjoin.left, registry, runtime, extension_codec)?;
1158 let right: Arc<dyn ExecutionPlan> =
1159 into_physical_plan(&hashjoin.right, registry, runtime, extension_codec)?;
1160 let left_schema = left.schema();
1161 let right_schema = right.schema();
1162 let on: Vec<(PhysicalExprRef, PhysicalExprRef)> = hashjoin
1163 .on
1164 .iter()
1165 .map(|col| {
1166 let left = parse_physical_expr(
1167 &col.left.clone().unwrap(),
1168 registry,
1169 left_schema.as_ref(),
1170 extension_codec,
1171 )?;
1172 let right = parse_physical_expr(
1173 &col.right.clone().unwrap(),
1174 registry,
1175 right_schema.as_ref(),
1176 extension_codec,
1177 )?;
1178 Ok((left, right))
1179 })
1180 .collect::<Result<_>>()?;
1181 let join_type =
1182 protobuf::JoinType::try_from(hashjoin.join_type).map_err(|_| {
1183 proto_error(format!(
1184 "Received a HashJoinNode message with unknown JoinType {}",
1185 hashjoin.join_type
1186 ))
1187 })?;
1188 let null_equality = protobuf::NullEquality::try_from(hashjoin.null_equality)
1189 .map_err(|_| {
1190 proto_error(format!(
1191 "Received a HashJoinNode message with unknown NullEquality {}",
1192 hashjoin.null_equality
1193 ))
1194 })?;
1195 let filter = hashjoin
1196 .filter
1197 .as_ref()
1198 .map(|f| {
1199 let schema = f
1200 .schema
1201 .as_ref()
1202 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1203 .try_into()?;
1204
1205 let expression = parse_physical_expr(
1206 f.expression.as_ref().ok_or_else(|| {
1207 proto_error("Unexpected empty filter expression")
1208 })?,
1209 registry, &schema,
1210 extension_codec,
1211 )?;
1212 let column_indices = f.column_indices
1213 .iter()
1214 .map(|i| {
1215 let side = protobuf::JoinSide::try_from(i.side)
1216 .map_err(|_| proto_error(format!(
1217 "Received a HashJoinNode message with JoinSide in Filter {}",
1218 i.side))
1219 )?;
1220
1221 Ok(ColumnIndex {
1222 index: i.index as usize,
1223 side: side.into(),
1224 })
1225 })
1226 .collect::<Result<Vec<_>>>()?;
1227
1228 Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1229 })
1230 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1231
1232 let partition_mode = protobuf::PartitionMode::try_from(hashjoin.partition_mode)
1233 .map_err(|_| {
1234 proto_error(format!(
1235 "Received a HashJoinNode message with unknown PartitionMode {}",
1236 hashjoin.partition_mode
1237 ))
1238 })?;
1239 let partition_mode = match partition_mode {
1240 protobuf::PartitionMode::CollectLeft => PartitionMode::CollectLeft,
1241 protobuf::PartitionMode::Partitioned => PartitionMode::Partitioned,
1242 protobuf::PartitionMode::Auto => PartitionMode::Auto,
1243 };
1244 let projection = if !hashjoin.projection.is_empty() {
1245 Some(
1246 hashjoin
1247 .projection
1248 .iter()
1249 .map(|i| *i as usize)
1250 .collect::<Vec<_>>(),
1251 )
1252 } else {
1253 None
1254 };
1255 Ok(Arc::new(HashJoinExec::try_new(
1256 left,
1257 right,
1258 on,
1259 filter,
1260 &join_type.into(),
1261 projection,
1262 partition_mode,
1263 null_equality.into(),
1264 )?))
1265 }
1266
1267 fn try_into_symmetric_hash_join_physical_plan(
1268 &self,
1269 sym_join: &protobuf::SymmetricHashJoinExecNode,
1270 registry: &dyn FunctionRegistry,
1271 runtime: &RuntimeEnv,
1272 extension_codec: &dyn PhysicalExtensionCodec,
1273 ) -> Result<Arc<dyn ExecutionPlan>> {
1274 let left =
1275 into_physical_plan(&sym_join.left, registry, runtime, extension_codec)?;
1276 let right =
1277 into_physical_plan(&sym_join.right, registry, runtime, extension_codec)?;
1278 let left_schema = left.schema();
1279 let right_schema = right.schema();
1280 let on = sym_join
1281 .on
1282 .iter()
1283 .map(|col| {
1284 let left = parse_physical_expr(
1285 &col.left.clone().unwrap(),
1286 registry,
1287 left_schema.as_ref(),
1288 extension_codec,
1289 )?;
1290 let right = parse_physical_expr(
1291 &col.right.clone().unwrap(),
1292 registry,
1293 right_schema.as_ref(),
1294 extension_codec,
1295 )?;
1296 Ok((left, right))
1297 })
1298 .collect::<Result<_>>()?;
1299 let join_type =
1300 protobuf::JoinType::try_from(sym_join.join_type).map_err(|_| {
1301 proto_error(format!(
1302 "Received a SymmetricHashJoin message with unknown JoinType {}",
1303 sym_join.join_type
1304 ))
1305 })?;
1306 let null_equality = protobuf::NullEquality::try_from(sym_join.null_equality)
1307 .map_err(|_| {
1308 proto_error(format!(
1309 "Received a SymmetricHashJoin message with unknown NullEquality {}",
1310 sym_join.null_equality
1311 ))
1312 })?;
1313 let filter = sym_join
1314 .filter
1315 .as_ref()
1316 .map(|f| {
1317 let schema = f
1318 .schema
1319 .as_ref()
1320 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1321 .try_into()?;
1322
1323 let expression = parse_physical_expr(
1324 f.expression.as_ref().ok_or_else(|| {
1325 proto_error("Unexpected empty filter expression")
1326 })?,
1327 registry, &schema,
1328 extension_codec,
1329 )?;
1330 let column_indices = f.column_indices
1331 .iter()
1332 .map(|i| {
1333 let side = protobuf::JoinSide::try_from(i.side)
1334 .map_err(|_| proto_error(format!(
1335 "Received a HashJoinNode message with JoinSide in Filter {}",
1336 i.side))
1337 )?;
1338
1339 Ok(ColumnIndex {
1340 index: i.index as usize,
1341 side: side.into(),
1342 })
1343 })
1344 .collect::<Result<_>>()?;
1345
1346 Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1347 })
1348 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1349
1350 let left_sort_exprs = parse_physical_sort_exprs(
1351 &sym_join.left_sort_exprs,
1352 registry,
1353 &left_schema,
1354 extension_codec,
1355 )?;
1356 let left_sort_exprs = LexOrdering::new(left_sort_exprs);
1357
1358 let right_sort_exprs = parse_physical_sort_exprs(
1359 &sym_join.right_sort_exprs,
1360 registry,
1361 &right_schema,
1362 extension_codec,
1363 )?;
1364 let right_sort_exprs = LexOrdering::new(right_sort_exprs);
1365
1366 let partition_mode = protobuf::StreamPartitionMode::try_from(
1367 sym_join.partition_mode,
1368 )
1369 .map_err(|_| {
1370 proto_error(format!(
1371 "Received a SymmetricHashJoin message with unknown PartitionMode {}",
1372 sym_join.partition_mode
1373 ))
1374 })?;
1375 let partition_mode = match partition_mode {
1376 protobuf::StreamPartitionMode::SinglePartition => {
1377 StreamJoinPartitionMode::SinglePartition
1378 }
1379 protobuf::StreamPartitionMode::PartitionedExec => {
1380 StreamJoinPartitionMode::Partitioned
1381 }
1382 };
1383 SymmetricHashJoinExec::try_new(
1384 left,
1385 right,
1386 on,
1387 filter,
1388 &join_type.into(),
1389 null_equality.into(),
1390 left_sort_exprs,
1391 right_sort_exprs,
1392 partition_mode,
1393 )
1394 .map(|e| Arc::new(e) as _)
1395 }
1396
1397 fn try_into_union_physical_plan(
1398 &self,
1399 union: &protobuf::UnionExecNode,
1400 registry: &dyn FunctionRegistry,
1401 runtime: &RuntimeEnv,
1402 extension_codec: &dyn PhysicalExtensionCodec,
1403 ) -> Result<Arc<dyn ExecutionPlan>> {
1404 let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1405 for input in &union.inputs {
1406 inputs.push(input.try_into_physical_plan(
1407 registry,
1408 runtime,
1409 extension_codec,
1410 )?);
1411 }
1412 Ok(Arc::new(UnionExec::new(inputs)))
1413 }
1414
1415 fn try_into_interleave_physical_plan(
1416 &self,
1417 interleave: &protobuf::InterleaveExecNode,
1418 registry: &dyn FunctionRegistry,
1419 runtime: &RuntimeEnv,
1420 extension_codec: &dyn PhysicalExtensionCodec,
1421 ) -> Result<Arc<dyn ExecutionPlan>> {
1422 let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1423 for input in &interleave.inputs {
1424 inputs.push(input.try_into_physical_plan(
1425 registry,
1426 runtime,
1427 extension_codec,
1428 )?);
1429 }
1430 Ok(Arc::new(InterleaveExec::try_new(inputs)?))
1431 }
1432
1433 fn try_into_cross_join_physical_plan(
1434 &self,
1435 crossjoin: &protobuf::CrossJoinExecNode,
1436 registry: &dyn FunctionRegistry,
1437 runtime: &RuntimeEnv,
1438 extension_codec: &dyn PhysicalExtensionCodec,
1439 ) -> Result<Arc<dyn ExecutionPlan>> {
1440 let left: Arc<dyn ExecutionPlan> =
1441 into_physical_plan(&crossjoin.left, registry, runtime, extension_codec)?;
1442 let right: Arc<dyn ExecutionPlan> =
1443 into_physical_plan(&crossjoin.right, registry, runtime, extension_codec)?;
1444 Ok(Arc::new(CrossJoinExec::new(left, right)))
1445 }
1446
1447 fn try_into_empty_physical_plan(
1448 &self,
1449 empty: &protobuf::EmptyExecNode,
1450 _registry: &dyn FunctionRegistry,
1451 _runtime: &RuntimeEnv,
1452 _extension_codec: &dyn PhysicalExtensionCodec,
1453 ) -> Result<Arc<dyn ExecutionPlan>> {
1454 let schema = Arc::new(convert_required!(empty.schema)?);
1455 Ok(Arc::new(EmptyExec::new(schema)))
1456 }
1457
1458 fn try_into_placeholder_row_physical_plan(
1459 &self,
1460 placeholder: &protobuf::PlaceholderRowExecNode,
1461 _registry: &dyn FunctionRegistry,
1462 _runtime: &RuntimeEnv,
1463 _extension_codec: &dyn PhysicalExtensionCodec,
1464 ) -> Result<Arc<dyn ExecutionPlan>> {
1465 let schema = Arc::new(convert_required!(placeholder.schema)?);
1466 Ok(Arc::new(PlaceholderRowExec::new(schema)))
1467 }
1468
1469 fn try_into_sort_physical_plan(
1470 &self,
1471 sort: &protobuf::SortExecNode,
1472 registry: &dyn FunctionRegistry,
1473 runtime: &RuntimeEnv,
1474 extension_codec: &dyn PhysicalExtensionCodec,
1475 ) -> Result<Arc<dyn ExecutionPlan>> {
1476 let input = into_physical_plan(&sort.input, registry, runtime, extension_codec)?;
1477 let exprs = sort
1478 .expr
1479 .iter()
1480 .map(|expr| {
1481 let expr = expr.expr_type.as_ref().ok_or_else(|| {
1482 proto_error(format!(
1483 "physical_plan::from_proto() Unexpected expr {self:?}"
1484 ))
1485 })?;
1486 if let ExprType::Sort(sort_expr) = expr {
1487 let expr = sort_expr
1488 .expr
1489 .as_ref()
1490 .ok_or_else(|| {
1491 proto_error(format!(
1492 "physical_plan::from_proto() Unexpected sort expr {self:?}"
1493 ))
1494 })?
1495 .as_ref();
1496 Ok(PhysicalSortExpr {
1497 expr: parse_physical_expr(expr, registry, input.schema().as_ref(), extension_codec)?,
1498 options: SortOptions {
1499 descending: !sort_expr.asc,
1500 nulls_first: sort_expr.nulls_first,
1501 },
1502 })
1503 } else {
1504 internal_err!(
1505 "physical_plan::from_proto() {self:?}"
1506 )
1507 }
1508 })
1509 .collect::<Result<Vec<_>>>()?;
1510 let Some(ordering) = LexOrdering::new(exprs) else {
1511 return internal_err!("SortExec requires an ordering");
1512 };
1513 let fetch = (sort.fetch >= 0).then_some(sort.fetch as _);
1514 let new_sort = SortExec::new(ordering, input)
1515 .with_fetch(fetch)
1516 .with_preserve_partitioning(sort.preserve_partitioning);
1517
1518 Ok(Arc::new(new_sort))
1519 }
1520
1521 fn try_into_sort_preserving_merge_physical_plan(
1522 &self,
1523 sort: &protobuf::SortPreservingMergeExecNode,
1524 registry: &dyn FunctionRegistry,
1525 runtime: &RuntimeEnv,
1526 extension_codec: &dyn PhysicalExtensionCodec,
1527 ) -> Result<Arc<dyn ExecutionPlan>> {
1528 let input = into_physical_plan(&sort.input, registry, runtime, extension_codec)?;
1529 let exprs = sort
1530 .expr
1531 .iter()
1532 .map(|expr| {
1533 let expr = expr.expr_type.as_ref().ok_or_else(|| {
1534 proto_error(format!(
1535 "physical_plan::from_proto() Unexpected expr {self:?}"
1536 ))
1537 })?;
1538 if let ExprType::Sort(sort_expr) = expr {
1539 let expr = sort_expr
1540 .expr
1541 .as_ref()
1542 .ok_or_else(|| {
1543 proto_error(format!(
1544 "physical_plan::from_proto() Unexpected sort expr {self:?}"
1545 ))
1546 })?
1547 .as_ref();
1548 Ok(PhysicalSortExpr {
1549 expr: parse_physical_expr(
1550 expr,
1551 registry,
1552 input.schema().as_ref(),
1553 extension_codec,
1554 )?,
1555 options: SortOptions {
1556 descending: !sort_expr.asc,
1557 nulls_first: sort_expr.nulls_first,
1558 },
1559 })
1560 } else {
1561 internal_err!("physical_plan::from_proto() {self:?}")
1562 }
1563 })
1564 .collect::<Result<Vec<_>>>()?;
1565 let Some(ordering) = LexOrdering::new(exprs) else {
1566 return internal_err!("SortExec requires an ordering");
1567 };
1568 let fetch = (sort.fetch >= 0).then_some(sort.fetch as _);
1569 Ok(Arc::new(
1570 SortPreservingMergeExec::new(ordering, input).with_fetch(fetch),
1571 ))
1572 }
1573
1574 fn try_into_extension_physical_plan(
1575 &self,
1576 extension: &protobuf::PhysicalExtensionNode,
1577 registry: &dyn FunctionRegistry,
1578 runtime: &RuntimeEnv,
1579 extension_codec: &dyn PhysicalExtensionCodec,
1580 ) -> Result<Arc<dyn ExecutionPlan>> {
1581 let inputs: Vec<Arc<dyn ExecutionPlan>> = extension
1582 .inputs
1583 .iter()
1584 .map(|i| i.try_into_physical_plan(registry, runtime, extension_codec))
1585 .collect::<Result<_>>()?;
1586
1587 let extension_node =
1588 extension_codec.try_decode(extension.node.as_slice(), &inputs, registry)?;
1589
1590 Ok(extension_node)
1591 }
1592
1593 fn try_into_nested_loop_join_physical_plan(
1594 &self,
1595 join: &protobuf::NestedLoopJoinExecNode,
1596 registry: &dyn FunctionRegistry,
1597 runtime: &RuntimeEnv,
1598 extension_codec: &dyn PhysicalExtensionCodec,
1599 ) -> Result<Arc<dyn ExecutionPlan>> {
1600 let left: Arc<dyn ExecutionPlan> =
1601 into_physical_plan(&join.left, registry, runtime, extension_codec)?;
1602 let right: Arc<dyn ExecutionPlan> =
1603 into_physical_plan(&join.right, registry, runtime, extension_codec)?;
1604 let join_type = protobuf::JoinType::try_from(join.join_type).map_err(|_| {
1605 proto_error(format!(
1606 "Received a NestedLoopJoinExecNode message with unknown JoinType {}",
1607 join.join_type
1608 ))
1609 })?;
1610 let filter = join
1611 .filter
1612 .as_ref()
1613 .map(|f| {
1614 let schema = f
1615 .schema
1616 .as_ref()
1617 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1618 .try_into()?;
1619
1620 let expression = parse_physical_expr(
1621 f.expression.as_ref().ok_or_else(|| {
1622 proto_error("Unexpected empty filter expression")
1623 })?,
1624 registry, &schema,
1625 extension_codec,
1626 )?;
1627 let column_indices = f.column_indices
1628 .iter()
1629 .map(|i| {
1630 let side = protobuf::JoinSide::try_from(i.side)
1631 .map_err(|_| proto_error(format!(
1632 "Received a NestedLoopJoinExecNode message with JoinSide in Filter {}",
1633 i.side))
1634 )?;
1635
1636 Ok(ColumnIndex {
1637 index: i.index as usize,
1638 side: side.into(),
1639 })
1640 })
1641 .collect::<Result<Vec<_>>>()?;
1642
1643 Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1644 })
1645 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1646
1647 let projection = if !join.projection.is_empty() {
1648 Some(
1649 join.projection
1650 .iter()
1651 .map(|i| *i as usize)
1652 .collect::<Vec<_>>(),
1653 )
1654 } else {
1655 None
1656 };
1657
1658 Ok(Arc::new(NestedLoopJoinExec::try_new(
1659 left,
1660 right,
1661 filter,
1662 &join_type.into(),
1663 projection,
1664 )?))
1665 }
1666
1667 fn try_into_analyze_physical_plan(
1668 &self,
1669 analyze: &protobuf::AnalyzeExecNode,
1670 registry: &dyn FunctionRegistry,
1671 runtime: &RuntimeEnv,
1672 extension_codec: &dyn PhysicalExtensionCodec,
1673 ) -> Result<Arc<dyn ExecutionPlan>> {
1674 let input: Arc<dyn ExecutionPlan> =
1675 into_physical_plan(&analyze.input, registry, runtime, extension_codec)?;
1676 Ok(Arc::new(AnalyzeExec::new(
1677 analyze.verbose,
1678 analyze.show_statistics,
1679 input,
1680 Arc::new(convert_required!(analyze.schema)?),
1681 )))
1682 }
1683
1684 fn try_into_json_sink_physical_plan(
1685 &self,
1686 sink: &protobuf::JsonSinkExecNode,
1687 registry: &dyn FunctionRegistry,
1688 runtime: &RuntimeEnv,
1689 extension_codec: &dyn PhysicalExtensionCodec,
1690 ) -> Result<Arc<dyn ExecutionPlan>> {
1691 let input = into_physical_plan(&sink.input, registry, runtime, extension_codec)?;
1692
1693 let data_sink: JsonSink = sink
1694 .sink
1695 .as_ref()
1696 .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1697 .try_into()?;
1698 let sink_schema = input.schema();
1699 let sort_order = sink
1700 .sort_order
1701 .as_ref()
1702 .map(|collection| {
1703 parse_physical_sort_exprs(
1704 &collection.physical_sort_expr_nodes,
1705 registry,
1706 &sink_schema,
1707 extension_codec,
1708 )
1709 .map(|sort_exprs| {
1710 LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1711 })
1712 })
1713 .transpose()?
1714 .flatten();
1715 Ok(Arc::new(DataSinkExec::new(
1716 input,
1717 Arc::new(data_sink),
1718 sort_order,
1719 )))
1720 }
1721
1722 fn try_into_csv_sink_physical_plan(
1723 &self,
1724 sink: &protobuf::CsvSinkExecNode,
1725 registry: &dyn FunctionRegistry,
1726 runtime: &RuntimeEnv,
1727 extension_codec: &dyn PhysicalExtensionCodec,
1728 ) -> Result<Arc<dyn ExecutionPlan>> {
1729 let input = into_physical_plan(&sink.input, registry, runtime, extension_codec)?;
1730
1731 let data_sink: CsvSink = sink
1732 .sink
1733 .as_ref()
1734 .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1735 .try_into()?;
1736 let sink_schema = input.schema();
1737 let sort_order = sink
1738 .sort_order
1739 .as_ref()
1740 .map(|collection| {
1741 parse_physical_sort_exprs(
1742 &collection.physical_sort_expr_nodes,
1743 registry,
1744 &sink_schema,
1745 extension_codec,
1746 )
1747 .map(|sort_exprs| {
1748 LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1749 })
1750 })
1751 .transpose()?
1752 .flatten();
1753 Ok(Arc::new(DataSinkExec::new(
1754 input,
1755 Arc::new(data_sink),
1756 sort_order,
1757 )))
1758 }
1759
1760 fn try_into_parquet_sink_physical_plan(
1761 &self,
1762 sink: &protobuf::ParquetSinkExecNode,
1763 registry: &dyn FunctionRegistry,
1764 runtime: &RuntimeEnv,
1765 extension_codec: &dyn PhysicalExtensionCodec,
1766 ) -> Result<Arc<dyn ExecutionPlan>> {
1767 #[cfg(feature = "parquet")]
1768 {
1769 let input =
1770 into_physical_plan(&sink.input, registry, runtime, extension_codec)?;
1771
1772 let data_sink: ParquetSink = sink
1773 .sink
1774 .as_ref()
1775 .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1776 .try_into()?;
1777 let sink_schema = input.schema();
1778 let sort_order = sink
1779 .sort_order
1780 .as_ref()
1781 .map(|collection| {
1782 parse_physical_sort_exprs(
1783 &collection.physical_sort_expr_nodes,
1784 registry,
1785 &sink_schema,
1786 extension_codec,
1787 )
1788 .map(|sort_exprs| {
1789 LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1790 })
1791 })
1792 .transpose()?
1793 .flatten();
1794 Ok(Arc::new(DataSinkExec::new(
1795 input,
1796 Arc::new(data_sink),
1797 sort_order,
1798 )))
1799 }
1800 #[cfg(not(feature = "parquet"))]
1801 panic!("Trying to use ParquetSink without `parquet` feature enabled");
1802 }
1803
1804 fn try_into_unnest_physical_plan(
1805 &self,
1806 unnest: &protobuf::UnnestExecNode,
1807 registry: &dyn FunctionRegistry,
1808 runtime: &RuntimeEnv,
1809 extension_codec: &dyn PhysicalExtensionCodec,
1810 ) -> Result<Arc<dyn ExecutionPlan>> {
1811 let input =
1812 into_physical_plan(&unnest.input, registry, runtime, extension_codec)?;
1813
1814 Ok(Arc::new(UnnestExec::new(
1815 input,
1816 unnest
1817 .list_type_columns
1818 .iter()
1819 .map(|c| ListUnnest {
1820 index_in_input_schema: c.index_in_input_schema as _,
1821 depth: c.depth as _,
1822 })
1823 .collect(),
1824 unnest.struct_type_columns.iter().map(|c| *c as _).collect(),
1825 Arc::new(convert_required!(unnest.schema)?),
1826 into_required!(unnest.options)?,
1827 )))
1828 }
1829
1830 fn try_into_cooperative_physical_plan(
1831 &self,
1832 field_stream: &protobuf::CooperativeExecNode,
1833 registry: &dyn FunctionRegistry,
1834 runtime: &RuntimeEnv,
1835 extension_codec: &dyn PhysicalExtensionCodec,
1836 ) -> Result<Arc<dyn ExecutionPlan>> {
1837 let input =
1838 into_physical_plan(&field_stream.input, registry, runtime, extension_codec)?;
1839 Ok(Arc::new(CooperativeExec::new(input)))
1840 }
1841
1842 fn try_from_explain_exec(
1843 exec: &ExplainExec,
1844 _extension_codec: &dyn PhysicalExtensionCodec,
1845 ) -> Result<Self> {
1846 Ok(protobuf::PhysicalPlanNode {
1847 physical_plan_type: Some(PhysicalPlanType::Explain(
1848 protobuf::ExplainExecNode {
1849 schema: Some(exec.schema().as_ref().try_into()?),
1850 stringified_plans: exec
1851 .stringified_plans()
1852 .iter()
1853 .map(|plan| plan.into())
1854 .collect(),
1855 verbose: exec.verbose(),
1856 },
1857 )),
1858 })
1859 }
1860
1861 fn try_from_projection_exec(
1862 exec: &ProjectionExec,
1863 extension_codec: &dyn PhysicalExtensionCodec,
1864 ) -> Result<Self> {
1865 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1866 exec.input().to_owned(),
1867 extension_codec,
1868 )?;
1869 let expr = exec
1870 .expr()
1871 .iter()
1872 .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
1873 .collect::<Result<Vec<_>>>()?;
1874 let expr_name = exec.expr().iter().map(|expr| expr.1.clone()).collect();
1875 Ok(protobuf::PhysicalPlanNode {
1876 physical_plan_type: Some(PhysicalPlanType::Projection(Box::new(
1877 protobuf::ProjectionExecNode {
1878 input: Some(Box::new(input)),
1879 expr,
1880 expr_name,
1881 },
1882 ))),
1883 })
1884 }
1885
1886 fn try_from_analyze_exec(
1887 exec: &AnalyzeExec,
1888 extension_codec: &dyn PhysicalExtensionCodec,
1889 ) -> Result<Self> {
1890 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1891 exec.input().to_owned(),
1892 extension_codec,
1893 )?;
1894 Ok(protobuf::PhysicalPlanNode {
1895 physical_plan_type: Some(PhysicalPlanType::Analyze(Box::new(
1896 protobuf::AnalyzeExecNode {
1897 verbose: exec.verbose(),
1898 show_statistics: exec.show_statistics(),
1899 input: Some(Box::new(input)),
1900 schema: Some(exec.schema().as_ref().try_into()?),
1901 },
1902 ))),
1903 })
1904 }
1905
1906 fn try_from_filter_exec(
1907 exec: &FilterExec,
1908 extension_codec: &dyn PhysicalExtensionCodec,
1909 ) -> Result<Self> {
1910 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1911 exec.input().to_owned(),
1912 extension_codec,
1913 )?;
1914 Ok(protobuf::PhysicalPlanNode {
1915 physical_plan_type: Some(PhysicalPlanType::Filter(Box::new(
1916 protobuf::FilterExecNode {
1917 input: Some(Box::new(input)),
1918 expr: Some(serialize_physical_expr(
1919 exec.predicate(),
1920 extension_codec,
1921 )?),
1922 default_filter_selectivity: exec.default_selectivity() as u32,
1923 projection: exec.projection().as_ref().map_or_else(Vec::new, |v| {
1924 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
1925 }),
1926 },
1927 ))),
1928 })
1929 }
1930
1931 fn try_from_global_limit_exec(
1932 limit: &GlobalLimitExec,
1933 extension_codec: &dyn PhysicalExtensionCodec,
1934 ) -> Result<Self> {
1935 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1936 limit.input().to_owned(),
1937 extension_codec,
1938 )?;
1939
1940 Ok(protobuf::PhysicalPlanNode {
1941 physical_plan_type: Some(PhysicalPlanType::GlobalLimit(Box::new(
1942 protobuf::GlobalLimitExecNode {
1943 input: Some(Box::new(input)),
1944 skip: limit.skip() as u32,
1945 fetch: match limit.fetch() {
1946 Some(n) => n as i64,
1947 _ => -1, },
1949 },
1950 ))),
1951 })
1952 }
1953
1954 fn try_from_local_limit_exec(
1955 limit: &LocalLimitExec,
1956 extension_codec: &dyn PhysicalExtensionCodec,
1957 ) -> Result<Self> {
1958 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1959 limit.input().to_owned(),
1960 extension_codec,
1961 )?;
1962 Ok(protobuf::PhysicalPlanNode {
1963 physical_plan_type: Some(PhysicalPlanType::LocalLimit(Box::new(
1964 protobuf::LocalLimitExecNode {
1965 input: Some(Box::new(input)),
1966 fetch: limit.fetch() as u32,
1967 },
1968 ))),
1969 })
1970 }
1971
1972 fn try_from_hash_join_exec(
1973 exec: &HashJoinExec,
1974 extension_codec: &dyn PhysicalExtensionCodec,
1975 ) -> Result<Self> {
1976 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
1977 exec.left().to_owned(),
1978 extension_codec,
1979 )?;
1980 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
1981 exec.right().to_owned(),
1982 extension_codec,
1983 )?;
1984 let on: Vec<protobuf::JoinOn> = exec
1985 .on()
1986 .iter()
1987 .map(|tuple| {
1988 let l = serialize_physical_expr(&tuple.0, extension_codec)?;
1989 let r = serialize_physical_expr(&tuple.1, extension_codec)?;
1990 Ok::<_, DataFusionError>(protobuf::JoinOn {
1991 left: Some(l),
1992 right: Some(r),
1993 })
1994 })
1995 .collect::<Result<_>>()?;
1996 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
1997 let null_equality: protobuf::NullEquality = exec.null_equality().into();
1998 let filter = exec
1999 .filter()
2000 .as_ref()
2001 .map(|f| {
2002 let expression =
2003 serialize_physical_expr(f.expression(), extension_codec)?;
2004 let column_indices = f
2005 .column_indices()
2006 .iter()
2007 .map(|i| {
2008 let side: protobuf::JoinSide = i.side.to_owned().into();
2009 protobuf::ColumnIndex {
2010 index: i.index as u32,
2011 side: side.into(),
2012 }
2013 })
2014 .collect();
2015 let schema = f.schema().as_ref().try_into()?;
2016 Ok(protobuf::JoinFilter {
2017 expression: Some(expression),
2018 column_indices,
2019 schema: Some(schema),
2020 })
2021 })
2022 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2023
2024 let partition_mode = match exec.partition_mode() {
2025 PartitionMode::CollectLeft => protobuf::PartitionMode::CollectLeft,
2026 PartitionMode::Partitioned => protobuf::PartitionMode::Partitioned,
2027 PartitionMode::Auto => protobuf::PartitionMode::Auto,
2028 };
2029
2030 Ok(protobuf::PhysicalPlanNode {
2031 physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new(
2032 protobuf::HashJoinExecNode {
2033 left: Some(Box::new(left)),
2034 right: Some(Box::new(right)),
2035 on,
2036 join_type: join_type.into(),
2037 partition_mode: partition_mode.into(),
2038 null_equality: null_equality.into(),
2039 filter,
2040 projection: exec.projection.as_ref().map_or_else(Vec::new, |v| {
2041 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2042 }),
2043 },
2044 ))),
2045 })
2046 }
2047
2048 fn try_from_symmetric_hash_join_exec(
2049 exec: &SymmetricHashJoinExec,
2050 extension_codec: &dyn PhysicalExtensionCodec,
2051 ) -> Result<Self> {
2052 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2053 exec.left().to_owned(),
2054 extension_codec,
2055 )?;
2056 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2057 exec.right().to_owned(),
2058 extension_codec,
2059 )?;
2060 let on = exec
2061 .on()
2062 .iter()
2063 .map(|tuple| {
2064 let l = serialize_physical_expr(&tuple.0, extension_codec)?;
2065 let r = serialize_physical_expr(&tuple.1, extension_codec)?;
2066 Ok::<_, DataFusionError>(protobuf::JoinOn {
2067 left: Some(l),
2068 right: Some(r),
2069 })
2070 })
2071 .collect::<Result<_>>()?;
2072 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2073 let null_equality: protobuf::NullEquality = exec.null_equality().into();
2074 let filter = exec
2075 .filter()
2076 .as_ref()
2077 .map(|f| {
2078 let expression =
2079 serialize_physical_expr(f.expression(), extension_codec)?;
2080 let column_indices = f
2081 .column_indices()
2082 .iter()
2083 .map(|i| {
2084 let side: protobuf::JoinSide = i.side.to_owned().into();
2085 protobuf::ColumnIndex {
2086 index: i.index as u32,
2087 side: side.into(),
2088 }
2089 })
2090 .collect();
2091 let schema = f.schema().as_ref().try_into()?;
2092 Ok(protobuf::JoinFilter {
2093 expression: Some(expression),
2094 column_indices,
2095 schema: Some(schema),
2096 })
2097 })
2098 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2099
2100 let partition_mode = match exec.partition_mode() {
2101 StreamJoinPartitionMode::SinglePartition => {
2102 protobuf::StreamPartitionMode::SinglePartition
2103 }
2104 StreamJoinPartitionMode::Partitioned => {
2105 protobuf::StreamPartitionMode::PartitionedExec
2106 }
2107 };
2108
2109 let left_sort_exprs = exec
2110 .left_sort_exprs()
2111 .map(|exprs| {
2112 exprs
2113 .iter()
2114 .map(|expr| {
2115 Ok(protobuf::PhysicalSortExprNode {
2116 expr: Some(Box::new(serialize_physical_expr(
2117 &expr.expr,
2118 extension_codec,
2119 )?)),
2120 asc: !expr.options.descending,
2121 nulls_first: expr.options.nulls_first,
2122 })
2123 })
2124 .collect::<Result<Vec<_>>>()
2125 })
2126 .transpose()?
2127 .unwrap_or(vec![]);
2128
2129 let right_sort_exprs = exec
2130 .right_sort_exprs()
2131 .map(|exprs| {
2132 exprs
2133 .iter()
2134 .map(|expr| {
2135 Ok(protobuf::PhysicalSortExprNode {
2136 expr: Some(Box::new(serialize_physical_expr(
2137 &expr.expr,
2138 extension_codec,
2139 )?)),
2140 asc: !expr.options.descending,
2141 nulls_first: expr.options.nulls_first,
2142 })
2143 })
2144 .collect::<Result<Vec<_>>>()
2145 })
2146 .transpose()?
2147 .unwrap_or(vec![]);
2148
2149 Ok(protobuf::PhysicalPlanNode {
2150 physical_plan_type: Some(PhysicalPlanType::SymmetricHashJoin(Box::new(
2151 protobuf::SymmetricHashJoinExecNode {
2152 left: Some(Box::new(left)),
2153 right: Some(Box::new(right)),
2154 on,
2155 join_type: join_type.into(),
2156 partition_mode: partition_mode.into(),
2157 null_equality: null_equality.into(),
2158 left_sort_exprs,
2159 right_sort_exprs,
2160 filter,
2161 },
2162 ))),
2163 })
2164 }
2165
2166 fn try_from_cross_join_exec(
2167 exec: &CrossJoinExec,
2168 extension_codec: &dyn PhysicalExtensionCodec,
2169 ) -> Result<Self> {
2170 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2171 exec.left().to_owned(),
2172 extension_codec,
2173 )?;
2174 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2175 exec.right().to_owned(),
2176 extension_codec,
2177 )?;
2178 Ok(protobuf::PhysicalPlanNode {
2179 physical_plan_type: Some(PhysicalPlanType::CrossJoin(Box::new(
2180 protobuf::CrossJoinExecNode {
2181 left: Some(Box::new(left)),
2182 right: Some(Box::new(right)),
2183 },
2184 ))),
2185 })
2186 }
2187
2188 fn try_from_aggregate_exec(
2189 exec: &AggregateExec,
2190 extension_codec: &dyn PhysicalExtensionCodec,
2191 ) -> Result<Self> {
2192 let groups: Vec<bool> = exec
2193 .group_expr()
2194 .groups()
2195 .iter()
2196 .flatten()
2197 .copied()
2198 .collect();
2199
2200 let group_names = exec
2201 .group_expr()
2202 .expr()
2203 .iter()
2204 .map(|expr| expr.1.to_owned())
2205 .collect();
2206
2207 let filter = exec
2208 .filter_expr()
2209 .iter()
2210 .map(|expr| serialize_maybe_filter(expr.to_owned(), extension_codec))
2211 .collect::<Result<Vec<_>>>()?;
2212
2213 let agg = exec
2214 .aggr_expr()
2215 .iter()
2216 .map(|expr| serialize_physical_aggr_expr(expr.to_owned(), extension_codec))
2217 .collect::<Result<Vec<_>>>()?;
2218
2219 let agg_names = exec
2220 .aggr_expr()
2221 .iter()
2222 .map(|expr| expr.name().to_string())
2223 .collect::<Vec<_>>();
2224
2225 let agg_mode = match exec.mode() {
2226 AggregateMode::Partial => protobuf::AggregateMode::Partial,
2227 AggregateMode::Final => protobuf::AggregateMode::Final,
2228 AggregateMode::FinalPartitioned => protobuf::AggregateMode::FinalPartitioned,
2229 AggregateMode::Single => protobuf::AggregateMode::Single,
2230 AggregateMode::SinglePartitioned => {
2231 protobuf::AggregateMode::SinglePartitioned
2232 }
2233 };
2234 let input_schema = exec.input_schema();
2235 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2236 exec.input().to_owned(),
2237 extension_codec,
2238 )?;
2239
2240 let null_expr = exec
2241 .group_expr()
2242 .null_expr()
2243 .iter()
2244 .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
2245 .collect::<Result<Vec<_>>>()?;
2246
2247 let group_expr = exec
2248 .group_expr()
2249 .expr()
2250 .iter()
2251 .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
2252 .collect::<Result<Vec<_>>>()?;
2253
2254 let limit = exec.limit().map(|value| protobuf::AggLimit {
2255 limit: value as u64,
2256 });
2257
2258 Ok(protobuf::PhysicalPlanNode {
2259 physical_plan_type: Some(PhysicalPlanType::Aggregate(Box::new(
2260 protobuf::AggregateExecNode {
2261 group_expr,
2262 group_expr_name: group_names,
2263 aggr_expr: agg,
2264 filter_expr: filter,
2265 aggr_expr_name: agg_names,
2266 mode: agg_mode as i32,
2267 input: Some(Box::new(input)),
2268 input_schema: Some(input_schema.as_ref().try_into()?),
2269 null_expr,
2270 groups,
2271 limit,
2272 },
2273 ))),
2274 })
2275 }
2276
2277 fn try_from_empty_exec(
2278 empty: &EmptyExec,
2279 _extension_codec: &dyn PhysicalExtensionCodec,
2280 ) -> Result<Self> {
2281 let schema = empty.schema().as_ref().try_into()?;
2282 Ok(protobuf::PhysicalPlanNode {
2283 physical_plan_type: Some(PhysicalPlanType::Empty(protobuf::EmptyExecNode {
2284 schema: Some(schema),
2285 })),
2286 })
2287 }
2288
2289 fn try_from_placeholder_row_exec(
2290 empty: &PlaceholderRowExec,
2291 _extension_codec: &dyn PhysicalExtensionCodec,
2292 ) -> Result<Self> {
2293 let schema = empty.schema().as_ref().try_into()?;
2294 Ok(protobuf::PhysicalPlanNode {
2295 physical_plan_type: Some(PhysicalPlanType::PlaceholderRow(
2296 protobuf::PlaceholderRowExecNode {
2297 schema: Some(schema),
2298 },
2299 )),
2300 })
2301 }
2302
2303 fn try_from_coalesce_batches_exec(
2304 coalesce_batches: &CoalesceBatchesExec,
2305 extension_codec: &dyn PhysicalExtensionCodec,
2306 ) -> Result<Self> {
2307 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2308 coalesce_batches.input().to_owned(),
2309 extension_codec,
2310 )?;
2311 Ok(protobuf::PhysicalPlanNode {
2312 physical_plan_type: Some(PhysicalPlanType::CoalesceBatches(Box::new(
2313 protobuf::CoalesceBatchesExecNode {
2314 input: Some(Box::new(input)),
2315 target_batch_size: coalesce_batches.target_batch_size() as u32,
2316 fetch: coalesce_batches.fetch().map(|n| n as u32),
2317 },
2318 ))),
2319 })
2320 }
2321
2322 fn try_from_data_source_exec(
2323 data_source_exec: &DataSourceExec,
2324 extension_codec: &dyn PhysicalExtensionCodec,
2325 ) -> Result<Option<Self>> {
2326 let data_source = data_source_exec.data_source();
2327 if let Some(maybe_csv) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2328 let source = maybe_csv.file_source();
2329 if let Some(csv_config) = source.as_any().downcast_ref::<CsvSource>() {
2330 return Ok(Some(protobuf::PhysicalPlanNode {
2331 physical_plan_type: Some(PhysicalPlanType::CsvScan(
2332 protobuf::CsvScanExecNode {
2333 base_conf: Some(serialize_file_scan_config(
2334 maybe_csv,
2335 extension_codec,
2336 )?),
2337 has_header: csv_config.has_header(),
2338 delimiter: byte_to_string(
2339 csv_config.delimiter(),
2340 "delimiter",
2341 )?,
2342 quote: byte_to_string(csv_config.quote(), "quote")?,
2343 optional_escape: if let Some(escape) = csv_config.escape() {
2344 Some(
2345 protobuf::csv_scan_exec_node::OptionalEscape::Escape(
2346 byte_to_string(escape, "escape")?,
2347 ),
2348 )
2349 } else {
2350 None
2351 },
2352 optional_comment: if let Some(comment) = csv_config.comment()
2353 {
2354 Some(protobuf::csv_scan_exec_node::OptionalComment::Comment(
2355 byte_to_string(comment, "comment")?,
2356 ))
2357 } else {
2358 None
2359 },
2360 newlines_in_values: maybe_csv.newlines_in_values(),
2361 },
2362 )),
2363 }));
2364 }
2365 }
2366
2367 if let Some(scan_conf) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2368 let source = scan_conf.file_source();
2369 if let Some(_json_source) = source.as_any().downcast_ref::<JsonSource>() {
2370 return Ok(Some(protobuf::PhysicalPlanNode {
2371 physical_plan_type: Some(PhysicalPlanType::JsonScan(
2372 protobuf::JsonScanExecNode {
2373 base_conf: Some(serialize_file_scan_config(
2374 scan_conf,
2375 extension_codec,
2376 )?),
2377 },
2378 )),
2379 }));
2380 }
2381 }
2382
2383 #[cfg(feature = "parquet")]
2384 if let Some((maybe_parquet, conf)) =
2385 data_source_exec.downcast_to_file_source::<ParquetSource>()
2386 {
2387 let predicate = conf
2388 .predicate()
2389 .map(|pred| serialize_physical_expr(pred, extension_codec))
2390 .transpose()?;
2391 return Ok(Some(protobuf::PhysicalPlanNode {
2392 physical_plan_type: Some(PhysicalPlanType::ParquetScan(
2393 protobuf::ParquetScanExecNode {
2394 base_conf: Some(serialize_file_scan_config(
2395 maybe_parquet,
2396 extension_codec,
2397 )?),
2398 predicate,
2399 parquet_options: Some(conf.table_parquet_options().try_into()?),
2400 },
2401 )),
2402 }));
2403 }
2404
2405 #[cfg(feature = "avro")]
2406 if let Some(maybe_avro) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2407 let source = maybe_avro.file_source();
2408 if source.as_any().downcast_ref::<AvroSource>().is_some() {
2409 return Ok(Some(protobuf::PhysicalPlanNode {
2410 physical_plan_type: Some(PhysicalPlanType::AvroScan(
2411 protobuf::AvroScanExecNode {
2412 base_conf: Some(serialize_file_scan_config(
2413 maybe_avro,
2414 extension_codec,
2415 )?),
2416 },
2417 )),
2418 }));
2419 }
2420 }
2421
2422 Ok(None)
2423 }
2424
2425 fn try_from_coalesce_partitions_exec(
2426 exec: &CoalescePartitionsExec,
2427 extension_codec: &dyn PhysicalExtensionCodec,
2428 ) -> Result<Self> {
2429 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2430 exec.input().to_owned(),
2431 extension_codec,
2432 )?;
2433 Ok(protobuf::PhysicalPlanNode {
2434 physical_plan_type: Some(PhysicalPlanType::Merge(Box::new(
2435 protobuf::CoalescePartitionsExecNode {
2436 input: Some(Box::new(input)),
2437 fetch: exec.fetch().map(|f| f as u32),
2438 },
2439 ))),
2440 })
2441 }
2442
2443 fn try_from_repartition_exec(
2444 exec: &RepartitionExec,
2445 extension_codec: &dyn PhysicalExtensionCodec,
2446 ) -> Result<Self> {
2447 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2448 exec.input().to_owned(),
2449 extension_codec,
2450 )?;
2451
2452 let pb_partitioning =
2453 serialize_partitioning(exec.partitioning(), extension_codec)?;
2454
2455 Ok(protobuf::PhysicalPlanNode {
2456 physical_plan_type: Some(PhysicalPlanType::Repartition(Box::new(
2457 protobuf::RepartitionExecNode {
2458 input: Some(Box::new(input)),
2459 partitioning: Some(pb_partitioning),
2460 },
2461 ))),
2462 })
2463 }
2464
2465 fn try_from_sort_exec(
2466 exec: &SortExec,
2467 extension_codec: &dyn PhysicalExtensionCodec,
2468 ) -> Result<Self> {
2469 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2470 exec.input().to_owned(),
2471 extension_codec,
2472 )?;
2473 let expr = exec
2474 .expr()
2475 .iter()
2476 .map(|expr| {
2477 let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
2478 expr: Some(Box::new(serialize_physical_expr(
2479 &expr.expr,
2480 extension_codec,
2481 )?)),
2482 asc: !expr.options.descending,
2483 nulls_first: expr.options.nulls_first,
2484 });
2485 Ok(protobuf::PhysicalExprNode {
2486 expr_type: Some(ExprType::Sort(sort_expr)),
2487 })
2488 })
2489 .collect::<Result<Vec<_>>>()?;
2490 Ok(protobuf::PhysicalPlanNode {
2491 physical_plan_type: Some(PhysicalPlanType::Sort(Box::new(
2492 protobuf::SortExecNode {
2493 input: Some(Box::new(input)),
2494 expr,
2495 fetch: match exec.fetch() {
2496 Some(n) => n as i64,
2497 _ => -1,
2498 },
2499 preserve_partitioning: exec.preserve_partitioning(),
2500 },
2501 ))),
2502 })
2503 }
2504
2505 fn try_from_union_exec(
2506 union: &UnionExec,
2507 extension_codec: &dyn PhysicalExtensionCodec,
2508 ) -> Result<Self> {
2509 let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
2510 for input in union.inputs() {
2511 inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
2512 input.to_owned(),
2513 extension_codec,
2514 )?);
2515 }
2516 Ok(protobuf::PhysicalPlanNode {
2517 physical_plan_type: Some(PhysicalPlanType::Union(protobuf::UnionExecNode {
2518 inputs,
2519 })),
2520 })
2521 }
2522
2523 fn try_from_interleave_exec(
2524 interleave: &InterleaveExec,
2525 extension_codec: &dyn PhysicalExtensionCodec,
2526 ) -> Result<Self> {
2527 let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
2528 for input in interleave.inputs() {
2529 inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
2530 input.to_owned(),
2531 extension_codec,
2532 )?);
2533 }
2534 Ok(protobuf::PhysicalPlanNode {
2535 physical_plan_type: Some(PhysicalPlanType::Interleave(
2536 protobuf::InterleaveExecNode { inputs },
2537 )),
2538 })
2539 }
2540
2541 fn try_from_sort_preserving_merge_exec(
2542 exec: &SortPreservingMergeExec,
2543 extension_codec: &dyn PhysicalExtensionCodec,
2544 ) -> Result<Self> {
2545 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2546 exec.input().to_owned(),
2547 extension_codec,
2548 )?;
2549 let expr = exec
2550 .expr()
2551 .iter()
2552 .map(|expr| {
2553 let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
2554 expr: Some(Box::new(serialize_physical_expr(
2555 &expr.expr,
2556 extension_codec,
2557 )?)),
2558 asc: !expr.options.descending,
2559 nulls_first: expr.options.nulls_first,
2560 });
2561 Ok(protobuf::PhysicalExprNode {
2562 expr_type: Some(ExprType::Sort(sort_expr)),
2563 })
2564 })
2565 .collect::<Result<Vec<_>>>()?;
2566 Ok(protobuf::PhysicalPlanNode {
2567 physical_plan_type: Some(PhysicalPlanType::SortPreservingMerge(Box::new(
2568 protobuf::SortPreservingMergeExecNode {
2569 input: Some(Box::new(input)),
2570 expr,
2571 fetch: exec.fetch().map(|f| f as i64).unwrap_or(-1),
2572 },
2573 ))),
2574 })
2575 }
2576
2577 fn try_from_nested_loop_join_exec(
2578 exec: &NestedLoopJoinExec,
2579 extension_codec: &dyn PhysicalExtensionCodec,
2580 ) -> Result<Self> {
2581 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2582 exec.left().to_owned(),
2583 extension_codec,
2584 )?;
2585 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2586 exec.right().to_owned(),
2587 extension_codec,
2588 )?;
2589
2590 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2591 let filter = exec
2592 .filter()
2593 .as_ref()
2594 .map(|f| {
2595 let expression =
2596 serialize_physical_expr(f.expression(), extension_codec)?;
2597 let column_indices = f
2598 .column_indices()
2599 .iter()
2600 .map(|i| {
2601 let side: protobuf::JoinSide = i.side.to_owned().into();
2602 protobuf::ColumnIndex {
2603 index: i.index as u32,
2604 side: side.into(),
2605 }
2606 })
2607 .collect();
2608 let schema = f.schema().as_ref().try_into()?;
2609 Ok(protobuf::JoinFilter {
2610 expression: Some(expression),
2611 column_indices,
2612 schema: Some(schema),
2613 })
2614 })
2615 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2616
2617 Ok(protobuf::PhysicalPlanNode {
2618 physical_plan_type: Some(PhysicalPlanType::NestedLoopJoin(Box::new(
2619 protobuf::NestedLoopJoinExecNode {
2620 left: Some(Box::new(left)),
2621 right: Some(Box::new(right)),
2622 join_type: join_type.into(),
2623 filter,
2624 projection: exec.projection().map_or_else(Vec::new, |v| {
2625 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2626 }),
2627 },
2628 ))),
2629 })
2630 }
2631
2632 fn try_from_window_agg_exec(
2633 exec: &WindowAggExec,
2634 extension_codec: &dyn PhysicalExtensionCodec,
2635 ) -> Result<Self> {
2636 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2637 exec.input().to_owned(),
2638 extension_codec,
2639 )?;
2640
2641 let window_expr = exec
2642 .window_expr()
2643 .iter()
2644 .map(|e| serialize_physical_window_expr(e, extension_codec))
2645 .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
2646
2647 let partition_keys = exec
2648 .partition_keys()
2649 .iter()
2650 .map(|e| serialize_physical_expr(e, extension_codec))
2651 .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
2652
2653 Ok(protobuf::PhysicalPlanNode {
2654 physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
2655 protobuf::WindowAggExecNode {
2656 input: Some(Box::new(input)),
2657 window_expr,
2658 partition_keys,
2659 input_order_mode: None,
2660 },
2661 ))),
2662 })
2663 }
2664
2665 fn try_from_bounded_window_agg_exec(
2666 exec: &BoundedWindowAggExec,
2667 extension_codec: &dyn PhysicalExtensionCodec,
2668 ) -> Result<Self> {
2669 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2670 exec.input().to_owned(),
2671 extension_codec,
2672 )?;
2673
2674 let window_expr = exec
2675 .window_expr()
2676 .iter()
2677 .map(|e| serialize_physical_window_expr(e, extension_codec))
2678 .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
2679
2680 let partition_keys = exec
2681 .partition_keys()
2682 .iter()
2683 .map(|e| serialize_physical_expr(e, extension_codec))
2684 .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
2685
2686 let input_order_mode = match &exec.input_order_mode {
2687 InputOrderMode::Linear => {
2688 window_agg_exec_node::InputOrderMode::Linear(protobuf::EmptyMessage {})
2689 }
2690 InputOrderMode::PartiallySorted(columns) => {
2691 window_agg_exec_node::InputOrderMode::PartiallySorted(
2692 protobuf::PartiallySortedInputOrderMode {
2693 columns: columns.iter().map(|c| *c as u64).collect(),
2694 },
2695 )
2696 }
2697 InputOrderMode::Sorted => {
2698 window_agg_exec_node::InputOrderMode::Sorted(protobuf::EmptyMessage {})
2699 }
2700 };
2701
2702 Ok(protobuf::PhysicalPlanNode {
2703 physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
2704 protobuf::WindowAggExecNode {
2705 input: Some(Box::new(input)),
2706 window_expr,
2707 partition_keys,
2708 input_order_mode: Some(input_order_mode),
2709 },
2710 ))),
2711 })
2712 }
2713
2714 fn try_from_data_sink_exec(
2715 exec: &DataSinkExec,
2716 extension_codec: &dyn PhysicalExtensionCodec,
2717 ) -> Result<Option<Self>> {
2718 let input: protobuf::PhysicalPlanNode =
2719 protobuf::PhysicalPlanNode::try_from_physical_plan(
2720 exec.input().to_owned(),
2721 extension_codec,
2722 )?;
2723 let sort_order = match exec.sort_order() {
2724 Some(requirements) => {
2725 let expr = requirements
2726 .iter()
2727 .map(|requirement| {
2728 let expr: PhysicalSortExpr = requirement.to_owned().into();
2729 let sort_expr = protobuf::PhysicalSortExprNode {
2730 expr: Some(Box::new(serialize_physical_expr(
2731 &expr.expr,
2732 extension_codec,
2733 )?)),
2734 asc: !expr.options.descending,
2735 nulls_first: expr.options.nulls_first,
2736 };
2737 Ok(sort_expr)
2738 })
2739 .collect::<Result<Vec<_>>>()?;
2740 Some(protobuf::PhysicalSortExprNodeCollection {
2741 physical_sort_expr_nodes: expr,
2742 })
2743 }
2744 None => None,
2745 };
2746
2747 if let Some(sink) = exec.sink().as_any().downcast_ref::<JsonSink>() {
2748 return Ok(Some(protobuf::PhysicalPlanNode {
2749 physical_plan_type: Some(PhysicalPlanType::JsonSink(Box::new(
2750 protobuf::JsonSinkExecNode {
2751 input: Some(Box::new(input)),
2752 sink: Some(sink.try_into()?),
2753 sink_schema: Some(exec.schema().as_ref().try_into()?),
2754 sort_order,
2755 },
2756 ))),
2757 }));
2758 }
2759
2760 if let Some(sink) = exec.sink().as_any().downcast_ref::<CsvSink>() {
2761 return Ok(Some(protobuf::PhysicalPlanNode {
2762 physical_plan_type: Some(PhysicalPlanType::CsvSink(Box::new(
2763 protobuf::CsvSinkExecNode {
2764 input: Some(Box::new(input)),
2765 sink: Some(sink.try_into()?),
2766 sink_schema: Some(exec.schema().as_ref().try_into()?),
2767 sort_order,
2768 },
2769 ))),
2770 }));
2771 }
2772
2773 #[cfg(feature = "parquet")]
2774 if let Some(sink) = exec.sink().as_any().downcast_ref::<ParquetSink>() {
2775 return Ok(Some(protobuf::PhysicalPlanNode {
2776 physical_plan_type: Some(PhysicalPlanType::ParquetSink(Box::new(
2777 protobuf::ParquetSinkExecNode {
2778 input: Some(Box::new(input)),
2779 sink: Some(sink.try_into()?),
2780 sink_schema: Some(exec.schema().as_ref().try_into()?),
2781 sort_order,
2782 },
2783 ))),
2784 }));
2785 }
2786
2787 Ok(None)
2789 }
2790
2791 fn try_from_unnest_exec(
2792 exec: &UnnestExec,
2793 extension_codec: &dyn PhysicalExtensionCodec,
2794 ) -> Result<Self> {
2795 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2796 exec.input().to_owned(),
2797 extension_codec,
2798 )?;
2799
2800 Ok(protobuf::PhysicalPlanNode {
2801 physical_plan_type: Some(PhysicalPlanType::Unnest(Box::new(
2802 protobuf::UnnestExecNode {
2803 input: Some(Box::new(input)),
2804 schema: Some(exec.schema().try_into()?),
2805 list_type_columns: exec
2806 .list_column_indices()
2807 .iter()
2808 .map(|c| ProtoListUnnest {
2809 index_in_input_schema: c.index_in_input_schema as _,
2810 depth: c.depth as _,
2811 })
2812 .collect(),
2813 struct_type_columns: exec
2814 .struct_column_indices()
2815 .iter()
2816 .map(|c| *c as _)
2817 .collect(),
2818 options: Some(exec.options().into()),
2819 },
2820 ))),
2821 })
2822 }
2823
2824 fn try_from_cooperative_exec(
2825 exec: &CooperativeExec,
2826 extension_codec: &dyn PhysicalExtensionCodec,
2827 ) -> Result<Self> {
2828 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2829 exec.input().to_owned(),
2830 extension_codec,
2831 )?;
2832
2833 Ok(protobuf::PhysicalPlanNode {
2834 physical_plan_type: Some(PhysicalPlanType::Cooperative(Box::new(
2835 protobuf::CooperativeExecNode {
2836 input: Some(Box::new(input)),
2837 },
2838 ))),
2839 })
2840 }
2841}
2842
2843pub trait AsExecutionPlan: Debug + Send + Sync + Clone {
2844 fn try_decode(buf: &[u8]) -> Result<Self>
2845 where
2846 Self: Sized;
2847
2848 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
2849 where
2850 B: BufMut,
2851 Self: Sized;
2852
2853 fn try_into_physical_plan(
2854 &self,
2855 registry: &dyn FunctionRegistry,
2856 runtime: &RuntimeEnv,
2857 extension_codec: &dyn PhysicalExtensionCodec,
2858 ) -> Result<Arc<dyn ExecutionPlan>>;
2859
2860 fn try_from_physical_plan(
2861 plan: Arc<dyn ExecutionPlan>,
2862 extension_codec: &dyn PhysicalExtensionCodec,
2863 ) -> Result<Self>
2864 where
2865 Self: Sized;
2866}
2867
2868pub trait PhysicalExtensionCodec: Debug + Send + Sync {
2869 fn try_decode(
2870 &self,
2871 buf: &[u8],
2872 inputs: &[Arc<dyn ExecutionPlan>],
2873 registry: &dyn FunctionRegistry,
2874 ) -> Result<Arc<dyn ExecutionPlan>>;
2875
2876 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()>;
2877
2878 fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
2879 not_impl_err!("PhysicalExtensionCodec is not provided for scalar function {name}")
2880 }
2881
2882 fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
2883 Ok(())
2884 }
2885
2886 fn try_decode_expr(
2887 &self,
2888 _buf: &[u8],
2889 _inputs: &[Arc<dyn PhysicalExpr>],
2890 ) -> Result<Arc<dyn PhysicalExpr>> {
2891 not_impl_err!("PhysicalExtensionCodec is not provided")
2892 }
2893
2894 fn try_encode_expr(
2895 &self,
2896 _node: &Arc<dyn PhysicalExpr>,
2897 _buf: &mut Vec<u8>,
2898 ) -> Result<()> {
2899 not_impl_err!("PhysicalExtensionCodec is not provided")
2900 }
2901
2902 fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
2903 not_impl_err!(
2904 "PhysicalExtensionCodec is not provided for aggregate function {name}"
2905 )
2906 }
2907
2908 fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
2909 Ok(())
2910 }
2911
2912 fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
2913 not_impl_err!("PhysicalExtensionCodec is not provided for window function {name}")
2914 }
2915
2916 fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
2917 Ok(())
2918 }
2919}
2920
2921#[derive(Debug)]
2922pub struct DefaultPhysicalExtensionCodec {}
2923
2924impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec {
2925 fn try_decode(
2926 &self,
2927 _buf: &[u8],
2928 _inputs: &[Arc<dyn ExecutionPlan>],
2929 _registry: &dyn FunctionRegistry,
2930 ) -> Result<Arc<dyn ExecutionPlan>> {
2931 not_impl_err!("PhysicalExtensionCodec is not provided")
2932 }
2933
2934 fn try_encode(
2935 &self,
2936 _node: Arc<dyn ExecutionPlan>,
2937 _buf: &mut Vec<u8>,
2938 ) -> Result<()> {
2939 not_impl_err!("PhysicalExtensionCodec is not provided")
2940 }
2941}
2942
2943fn into_physical_plan(
2944 node: &Option<Box<protobuf::PhysicalPlanNode>>,
2945 registry: &dyn FunctionRegistry,
2946 runtime: &RuntimeEnv,
2947 extension_codec: &dyn PhysicalExtensionCodec,
2948) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
2949 if let Some(field) = node {
2950 field.try_into_physical_plan(registry, runtime, extension_codec)
2951 } else {
2952 Err(proto_error("Missing required field in protobuf"))
2953 }
2954}