1use std::fmt::Debug;
19use std::sync::Arc;
20
21use self::from_proto::parse_protobuf_partitioning;
22use self::to_proto::{serialize_partitioning, serialize_physical_expr};
23use crate::common::{byte_to_string, str_to_byte};
24use crate::physical_plan::from_proto::{
25 parse_physical_expr, parse_physical_sort_expr, parse_physical_sort_exprs,
26 parse_physical_window_expr, parse_protobuf_file_scan_config,
27 parse_protobuf_file_scan_schema,
28};
29use crate::physical_plan::to_proto::{
30 serialize_file_scan_config, serialize_maybe_filter, serialize_physical_aggr_expr,
31 serialize_physical_window_expr,
32};
33use crate::protobuf::physical_aggregate_expr_node::AggregateFunction;
34use crate::protobuf::physical_expr_node::ExprType;
35use crate::protobuf::physical_plan_node::PhysicalPlanType;
36use crate::protobuf::{
37 self, proto_error, window_agg_exec_node, ListUnnest as ProtoListUnnest,
38};
39use crate::{convert_required, into_required};
40
41use datafusion::arrow::compute::SortOptions;
42use datafusion::arrow::datatypes::{Schema, SchemaRef};
43use datafusion::datasource::file_format::csv::CsvSink;
44use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
45use datafusion::datasource::file_format::json::JsonSink;
46#[cfg(feature = "parquet")]
47use datafusion::datasource::file_format::parquet::ParquetSink;
48#[cfg(feature = "avro")]
49use datafusion::datasource::physical_plan::AvroSource;
50#[cfg(feature = "parquet")]
51use datafusion::datasource::physical_plan::ParquetSource;
52use datafusion::datasource::physical_plan::{
53 CsvSource, FileScanConfig, FileScanConfigBuilder, JsonSource,
54};
55use datafusion::datasource::sink::DataSinkExec;
56use datafusion::datasource::source::DataSourceExec;
57use datafusion::execution::runtime_env::RuntimeEnv;
58use datafusion::execution::FunctionRegistry;
59use datafusion::physical_expr::aggregate::AggregateExprBuilder;
60use datafusion::physical_expr::aggregate::AggregateFunctionExpr;
61use datafusion::physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef};
62use datafusion::physical_plan::aggregates::AggregateMode;
63use datafusion::physical_plan::aggregates::{AggregateExec, PhysicalGroupBy};
64use datafusion::physical_plan::analyze::AnalyzeExec;
65use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
66use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
67use datafusion::physical_plan::coop::CooperativeExec;
68use datafusion::physical_plan::empty::EmptyExec;
69use datafusion::physical_plan::explain::ExplainExec;
70use datafusion::physical_plan::expressions::PhysicalSortExpr;
71use datafusion::physical_plan::filter::FilterExec;
72use datafusion::physical_plan::joins::utils::{ColumnIndex, JoinFilter};
73use datafusion::physical_plan::joins::{
74 CrossJoinExec, NestedLoopJoinExec, StreamJoinPartitionMode, SymmetricHashJoinExec,
75};
76use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
77use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
78use datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
79use datafusion::physical_plan::projection::ProjectionExec;
80use datafusion::physical_plan::repartition::RepartitionExec;
81use datafusion::physical_plan::sorts::sort::SortExec;
82use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
83use datafusion::physical_plan::union::{InterleaveExec, UnionExec};
84use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec};
85use datafusion::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
86use datafusion::physical_plan::{
87 ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr,
88};
89use datafusion_common::config::TableParquetOptions;
90use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
91use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
92
93use prost::bytes::BufMut;
94use prost::Message;
95
96pub mod from_proto;
97pub mod to_proto;
98
99impl AsExecutionPlan for protobuf::PhysicalPlanNode {
100 fn try_decode(buf: &[u8]) -> Result<Self>
101 where
102 Self: Sized,
103 {
104 protobuf::PhysicalPlanNode::decode(buf).map_err(|e| {
105 DataFusionError::Internal(format!("failed to decode physical plan: {e:?}"))
106 })
107 }
108
109 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
110 where
111 B: BufMut,
112 Self: Sized,
113 {
114 self.encode(buf).map_err(|e| {
115 DataFusionError::Internal(format!("failed to encode physical plan: {e:?}"))
116 })
117 }
118
119 fn try_into_physical_plan(
120 &self,
121 registry: &dyn FunctionRegistry,
122 runtime: &RuntimeEnv,
123 extension_codec: &dyn PhysicalExtensionCodec,
124 ) -> Result<Arc<dyn ExecutionPlan>> {
125 let plan = self.physical_plan_type.as_ref().ok_or_else(|| {
126 proto_error(format!(
127 "physical_plan::from_proto() Unsupported physical plan '{self:?}'"
128 ))
129 })?;
130 match plan {
131 PhysicalPlanType::Explain(explain) => self.try_into_explain_physical_plan(
132 explain,
133 registry,
134 runtime,
135 extension_codec,
136 ),
137 PhysicalPlanType::Projection(projection) => self
138 .try_into_projection_physical_plan(
139 projection,
140 registry,
141 runtime,
142 extension_codec,
143 ),
144 PhysicalPlanType::Filter(filter) => self.try_into_filter_physical_plan(
145 filter,
146 registry,
147 runtime,
148 extension_codec,
149 ),
150 PhysicalPlanType::CsvScan(scan) => self.try_into_csv_scan_physical_plan(
151 scan,
152 registry,
153 runtime,
154 extension_codec,
155 ),
156 PhysicalPlanType::JsonScan(scan) => self.try_into_json_scan_physical_plan(
157 scan,
158 registry,
159 runtime,
160 extension_codec,
161 ),
162 #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
163 PhysicalPlanType::ParquetScan(scan) => self
164 .try_into_parquet_scan_physical_plan(
165 scan,
166 registry,
167 runtime,
168 extension_codec,
169 ),
170 #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
171 PhysicalPlanType::AvroScan(scan) => self.try_into_avro_scan_physical_plan(
172 scan,
173 registry,
174 runtime,
175 extension_codec,
176 ),
177 PhysicalPlanType::CoalesceBatches(coalesce_batches) => self
178 .try_into_coalesce_batches_physical_plan(
179 coalesce_batches,
180 registry,
181 runtime,
182 extension_codec,
183 ),
184 PhysicalPlanType::Merge(merge) => self.try_into_merge_physical_plan(
185 merge,
186 registry,
187 runtime,
188 extension_codec,
189 ),
190 PhysicalPlanType::Repartition(repart) => self
191 .try_into_repartition_physical_plan(
192 repart,
193 registry,
194 runtime,
195 extension_codec,
196 ),
197 PhysicalPlanType::GlobalLimit(limit) => self
198 .try_into_global_limit_physical_plan(
199 limit,
200 registry,
201 runtime,
202 extension_codec,
203 ),
204 PhysicalPlanType::LocalLimit(limit) => self
205 .try_into_local_limit_physical_plan(
206 limit,
207 registry,
208 runtime,
209 extension_codec,
210 ),
211 PhysicalPlanType::Window(window_agg) => self.try_into_window_physical_plan(
212 window_agg,
213 registry,
214 runtime,
215 extension_codec,
216 ),
217 PhysicalPlanType::Aggregate(hash_agg) => self
218 .try_into_aggregate_physical_plan(
219 hash_agg,
220 registry,
221 runtime,
222 extension_codec,
223 ),
224 PhysicalPlanType::HashJoin(hashjoin) => self
225 .try_into_hash_join_physical_plan(
226 hashjoin,
227 registry,
228 runtime,
229 extension_codec,
230 ),
231 PhysicalPlanType::SymmetricHashJoin(sym_join) => self
232 .try_into_symmetric_hash_join_physical_plan(
233 sym_join,
234 registry,
235 runtime,
236 extension_codec,
237 ),
238 PhysicalPlanType::Union(union) => self.try_into_union_physical_plan(
239 union,
240 registry,
241 runtime,
242 extension_codec,
243 ),
244 PhysicalPlanType::Interleave(interleave) => self
245 .try_into_interleave_physical_plan(
246 interleave,
247 registry,
248 runtime,
249 extension_codec,
250 ),
251 PhysicalPlanType::CrossJoin(crossjoin) => self
252 .try_into_cross_join_physical_plan(
253 crossjoin,
254 registry,
255 runtime,
256 extension_codec,
257 ),
258 PhysicalPlanType::Empty(empty) => self.try_into_empty_physical_plan(
259 empty,
260 registry,
261 runtime,
262 extension_codec,
263 ),
264 PhysicalPlanType::PlaceholderRow(placeholder) => self
265 .try_into_placeholder_row_physical_plan(
266 placeholder,
267 registry,
268 runtime,
269 extension_codec,
270 ),
271 PhysicalPlanType::Sort(sort) => {
272 self.try_into_sort_physical_plan(sort, registry, runtime, extension_codec)
273 }
274 PhysicalPlanType::SortPreservingMerge(sort) => self
275 .try_into_sort_preserving_merge_physical_plan(
276 sort,
277 registry,
278 runtime,
279 extension_codec,
280 ),
281 PhysicalPlanType::Extension(extension) => self
282 .try_into_extension_physical_plan(
283 extension,
284 registry,
285 runtime,
286 extension_codec,
287 ),
288 PhysicalPlanType::NestedLoopJoin(join) => self
289 .try_into_nested_loop_join_physical_plan(
290 join,
291 registry,
292 runtime,
293 extension_codec,
294 ),
295 PhysicalPlanType::Analyze(analyze) => self.try_into_analyze_physical_plan(
296 analyze,
297 registry,
298 runtime,
299 extension_codec,
300 ),
301 PhysicalPlanType::JsonSink(sink) => self.try_into_json_sink_physical_plan(
302 sink,
303 registry,
304 runtime,
305 extension_codec,
306 ),
307 PhysicalPlanType::CsvSink(sink) => self.try_into_csv_sink_physical_plan(
308 sink,
309 registry,
310 runtime,
311 extension_codec,
312 ),
313 #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
314 PhysicalPlanType::ParquetSink(sink) => self
315 .try_into_parquet_sink_physical_plan(
316 sink,
317 registry,
318 runtime,
319 extension_codec,
320 ),
321 PhysicalPlanType::Unnest(unnest) => self.try_into_unnest_physical_plan(
322 unnest,
323 registry,
324 runtime,
325 extension_codec,
326 ),
327 PhysicalPlanType::Cooperative(cooperative) => self
328 .try_into_cooperative_physical_plan(
329 cooperative,
330 registry,
331 runtime,
332 extension_codec,
333 ),
334 }
335 }
336
337 fn try_from_physical_plan(
338 plan: Arc<dyn ExecutionPlan>,
339 extension_codec: &dyn PhysicalExtensionCodec,
340 ) -> Result<Self>
341 where
342 Self: Sized,
343 {
344 let plan_clone = Arc::clone(&plan);
345 let plan = plan.as_any();
346
347 if let Some(exec) = plan.downcast_ref::<ExplainExec>() {
348 return protobuf::PhysicalPlanNode::try_from_explain_exec(
349 exec,
350 extension_codec,
351 );
352 }
353
354 if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
355 return protobuf::PhysicalPlanNode::try_from_projection_exec(
356 exec,
357 extension_codec,
358 );
359 }
360
361 if let Some(exec) = plan.downcast_ref::<AnalyzeExec>() {
362 return protobuf::PhysicalPlanNode::try_from_analyze_exec(
363 exec,
364 extension_codec,
365 );
366 }
367
368 if let Some(exec) = plan.downcast_ref::<FilterExec>() {
369 return protobuf::PhysicalPlanNode::try_from_filter_exec(
370 exec,
371 extension_codec,
372 );
373 }
374
375 if let Some(limit) = plan.downcast_ref::<GlobalLimitExec>() {
376 return protobuf::PhysicalPlanNode::try_from_global_limit_exec(
377 limit,
378 extension_codec,
379 );
380 }
381
382 if let Some(limit) = plan.downcast_ref::<LocalLimitExec>() {
383 return protobuf::PhysicalPlanNode::try_from_local_limit_exec(
384 limit,
385 extension_codec,
386 );
387 }
388
389 if let Some(exec) = plan.downcast_ref::<HashJoinExec>() {
390 return protobuf::PhysicalPlanNode::try_from_hash_join_exec(
391 exec,
392 extension_codec,
393 );
394 }
395
396 if let Some(exec) = plan.downcast_ref::<SymmetricHashJoinExec>() {
397 return protobuf::PhysicalPlanNode::try_from_symmetric_hash_join_exec(
398 exec,
399 extension_codec,
400 );
401 }
402
403 if let Some(exec) = plan.downcast_ref::<CrossJoinExec>() {
404 return protobuf::PhysicalPlanNode::try_from_cross_join_exec(
405 exec,
406 extension_codec,
407 );
408 }
409
410 if let Some(exec) = plan.downcast_ref::<AggregateExec>() {
411 return protobuf::PhysicalPlanNode::try_from_aggregate_exec(
412 exec,
413 extension_codec,
414 );
415 }
416
417 if let Some(empty) = plan.downcast_ref::<EmptyExec>() {
418 return protobuf::PhysicalPlanNode::try_from_empty_exec(
419 empty,
420 extension_codec,
421 );
422 }
423
424 if let Some(empty) = plan.downcast_ref::<PlaceholderRowExec>() {
425 return protobuf::PhysicalPlanNode::try_from_placeholder_row_exec(
426 empty,
427 extension_codec,
428 );
429 }
430
431 if let Some(coalesce_batches) = plan.downcast_ref::<CoalesceBatchesExec>() {
432 return protobuf::PhysicalPlanNode::try_from_coalesce_batches_exec(
433 coalesce_batches,
434 extension_codec,
435 );
436 }
437
438 if let Some(data_source_exec) = plan.downcast_ref::<DataSourceExec>() {
439 if let Some(node) = protobuf::PhysicalPlanNode::try_from_data_source_exec(
440 data_source_exec,
441 extension_codec,
442 )? {
443 return Ok(node);
444 }
445 }
446
447 if let Some(exec) = plan.downcast_ref::<CoalescePartitionsExec>() {
448 return protobuf::PhysicalPlanNode::try_from_coalesce_partitions_exec(
449 exec,
450 extension_codec,
451 );
452 }
453
454 if let Some(exec) = plan.downcast_ref::<RepartitionExec>() {
455 return protobuf::PhysicalPlanNode::try_from_repartition_exec(
456 exec,
457 extension_codec,
458 );
459 }
460
461 if let Some(exec) = plan.downcast_ref::<SortExec>() {
462 return protobuf::PhysicalPlanNode::try_from_sort_exec(exec, extension_codec);
463 }
464
465 if let Some(union) = plan.downcast_ref::<UnionExec>() {
466 return protobuf::PhysicalPlanNode::try_from_union_exec(
467 union,
468 extension_codec,
469 );
470 }
471
472 if let Some(interleave) = plan.downcast_ref::<InterleaveExec>() {
473 return protobuf::PhysicalPlanNode::try_from_interleave_exec(
474 interleave,
475 extension_codec,
476 );
477 }
478
479 if let Some(exec) = plan.downcast_ref::<SortPreservingMergeExec>() {
480 return protobuf::PhysicalPlanNode::try_from_sort_preserving_merge_exec(
481 exec,
482 extension_codec,
483 );
484 }
485
486 if let Some(exec) = plan.downcast_ref::<NestedLoopJoinExec>() {
487 return protobuf::PhysicalPlanNode::try_from_nested_loop_join_exec(
488 exec,
489 extension_codec,
490 );
491 }
492
493 if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
494 return protobuf::PhysicalPlanNode::try_from_window_agg_exec(
495 exec,
496 extension_codec,
497 );
498 }
499
500 if let Some(exec) = plan.downcast_ref::<BoundedWindowAggExec>() {
501 return protobuf::PhysicalPlanNode::try_from_bounded_window_agg_exec(
502 exec,
503 extension_codec,
504 );
505 }
506
507 if let Some(exec) = plan.downcast_ref::<DataSinkExec>() {
508 if let Some(node) = protobuf::PhysicalPlanNode::try_from_data_sink_exec(
509 exec,
510 extension_codec,
511 )? {
512 return Ok(node);
513 }
514 }
515
516 if let Some(exec) = plan.downcast_ref::<UnnestExec>() {
517 return protobuf::PhysicalPlanNode::try_from_unnest_exec(
518 exec,
519 extension_codec,
520 );
521 }
522
523 if let Some(exec) = plan.downcast_ref::<CooperativeExec>() {
524 return protobuf::PhysicalPlanNode::try_from_cooperative_exec(
525 exec,
526 extension_codec,
527 );
528 }
529
530 let mut buf: Vec<u8> = vec![];
531 match extension_codec.try_encode(Arc::clone(&plan_clone), &mut buf) {
532 Ok(_) => {
533 let inputs: Vec<protobuf::PhysicalPlanNode> = plan_clone
534 .children()
535 .into_iter()
536 .cloned()
537 .map(|i| {
538 protobuf::PhysicalPlanNode::try_from_physical_plan(
539 i,
540 extension_codec,
541 )
542 })
543 .collect::<Result<_>>()?;
544
545 Ok(protobuf::PhysicalPlanNode {
546 physical_plan_type: Some(PhysicalPlanType::Extension(
547 protobuf::PhysicalExtensionNode { node: buf, inputs },
548 )),
549 })
550 }
551 Err(e) => internal_err!(
552 "Unsupported plan and extension codec failed with [{e}]. Plan: {plan_clone:?}"
553 ),
554 }
555 }
556}
557
558impl protobuf::PhysicalPlanNode {
559 fn try_into_explain_physical_plan(
560 &self,
561 explain: &protobuf::ExplainExecNode,
562 _registry: &dyn FunctionRegistry,
563 _runtime: &RuntimeEnv,
564 _extension_codec: &dyn PhysicalExtensionCodec,
565 ) -> Result<Arc<dyn ExecutionPlan>> {
566 Ok(Arc::new(ExplainExec::new(
567 Arc::new(explain.schema.as_ref().unwrap().try_into()?),
568 explain
569 .stringified_plans
570 .iter()
571 .map(|plan| plan.into())
572 .collect(),
573 explain.verbose,
574 )))
575 }
576
577 fn try_into_projection_physical_plan(
578 &self,
579 projection: &protobuf::ProjectionExecNode,
580 registry: &dyn FunctionRegistry,
581 runtime: &RuntimeEnv,
582 extension_codec: &dyn PhysicalExtensionCodec,
583 ) -> Result<Arc<dyn ExecutionPlan>> {
584 let input: Arc<dyn ExecutionPlan> =
585 into_physical_plan(&projection.input, registry, runtime, extension_codec)?;
586 let exprs = projection
587 .expr
588 .iter()
589 .zip(projection.expr_name.iter())
590 .map(|(expr, name)| {
591 Ok((
592 parse_physical_expr(
593 expr,
594 registry,
595 input.schema().as_ref(),
596 extension_codec,
597 )?,
598 name.to_string(),
599 ))
600 })
601 .collect::<Result<Vec<(Arc<dyn PhysicalExpr>, String)>>>()?;
602 Ok(Arc::new(ProjectionExec::try_new(exprs, input)?))
603 }
604
605 fn try_into_filter_physical_plan(
606 &self,
607 filter: &protobuf::FilterExecNode,
608 registry: &dyn FunctionRegistry,
609 runtime: &RuntimeEnv,
610 extension_codec: &dyn PhysicalExtensionCodec,
611 ) -> Result<Arc<dyn ExecutionPlan>> {
612 let input: Arc<dyn ExecutionPlan> =
613 into_physical_plan(&filter.input, registry, runtime, extension_codec)?;
614
615 let predicate = filter
616 .expr
617 .as_ref()
618 .map(|expr| {
619 parse_physical_expr(
620 expr,
621 registry,
622 input.schema().as_ref(),
623 extension_codec,
624 )
625 })
626 .transpose()?
627 .ok_or_else(|| {
628 DataFusionError::Internal(
629 "filter (FilterExecNode) in PhysicalPlanNode is missing.".to_owned(),
630 )
631 })?;
632 let filter_selectivity = filter.default_filter_selectivity.try_into();
633 let projection = if !filter.projection.is_empty() {
634 Some(
635 filter
636 .projection
637 .iter()
638 .map(|i| *i as usize)
639 .collect::<Vec<_>>(),
640 )
641 } else {
642 None
643 };
644 let filter =
645 FilterExec::try_new(predicate, input)?.with_projection(projection)?;
646 match filter_selectivity {
647 Ok(filter_selectivity) => Ok(Arc::new(
648 filter.with_default_selectivity(filter_selectivity)?,
649 )),
650 Err(_) => Err(DataFusionError::Internal(
651 "filter_selectivity in PhysicalPlanNode is invalid ".to_owned(),
652 )),
653 }
654 }
655
656 fn try_into_csv_scan_physical_plan(
657 &self,
658 scan: &protobuf::CsvScanExecNode,
659 registry: &dyn FunctionRegistry,
660 _runtime: &RuntimeEnv,
661 extension_codec: &dyn PhysicalExtensionCodec,
662 ) -> Result<Arc<dyn ExecutionPlan>> {
663 let escape =
664 if let Some(protobuf::csv_scan_exec_node::OptionalEscape::Escape(escape)) =
665 &scan.optional_escape
666 {
667 Some(str_to_byte(escape, "escape")?)
668 } else {
669 None
670 };
671
672 let comment = if let Some(
673 protobuf::csv_scan_exec_node::OptionalComment::Comment(comment),
674 ) = &scan.optional_comment
675 {
676 Some(str_to_byte(comment, "comment")?)
677 } else {
678 None
679 };
680
681 let source = Arc::new(
682 CsvSource::new(
683 scan.has_header,
684 str_to_byte(&scan.delimiter, "delimiter")?,
685 0,
686 )
687 .with_escape(escape)
688 .with_comment(comment),
689 );
690
691 let conf = FileScanConfigBuilder::from(parse_protobuf_file_scan_config(
692 scan.base_conf.as_ref().unwrap(),
693 registry,
694 extension_codec,
695 source,
696 )?)
697 .with_newlines_in_values(scan.newlines_in_values)
698 .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
699 .build();
700 Ok(DataSourceExec::from_data_source(conf))
701 }
702
703 fn try_into_json_scan_physical_plan(
704 &self,
705 scan: &protobuf::JsonScanExecNode,
706 registry: &dyn FunctionRegistry,
707 _runtime: &RuntimeEnv,
708 extension_codec: &dyn PhysicalExtensionCodec,
709 ) -> Result<Arc<dyn ExecutionPlan>> {
710 let scan_conf = parse_protobuf_file_scan_config(
711 scan.base_conf.as_ref().unwrap(),
712 registry,
713 extension_codec,
714 Arc::new(JsonSource::new()),
715 )?;
716 Ok(DataSourceExec::from_data_source(scan_conf))
717 }
718
719 #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
720 fn try_into_parquet_scan_physical_plan(
721 &self,
722 scan: &protobuf::ParquetScanExecNode,
723 registry: &dyn FunctionRegistry,
724 _runtime: &RuntimeEnv,
725 extension_codec: &dyn PhysicalExtensionCodec,
726 ) -> Result<Arc<dyn ExecutionPlan>> {
727 #[cfg(feature = "parquet")]
728 {
729 let schema =
730 parse_protobuf_file_scan_schema(scan.base_conf.as_ref().unwrap())?;
731
732 let base_conf = scan.base_conf.as_ref().unwrap();
734 let predicate_schema = if !base_conf.projection.is_empty() {
735 let projected_fields: Vec<_> = base_conf
737 .projection
738 .iter()
739 .map(|&i| schema.field(i as usize).clone())
740 .collect();
741 Arc::new(Schema::new(projected_fields))
742 } else {
743 schema
744 };
745
746 let predicate = scan
747 .predicate
748 .as_ref()
749 .map(|expr| {
750 parse_physical_expr(
751 expr,
752 registry,
753 predicate_schema.as_ref(),
754 extension_codec,
755 )
756 })
757 .transpose()?;
758 let mut options = TableParquetOptions::default();
759
760 if let Some(table_options) = scan.parquet_options.as_ref() {
761 options = table_options.try_into()?;
762 }
763 let mut source = ParquetSource::new(options);
764
765 if let Some(predicate) = predicate {
766 source = source.with_predicate(predicate);
767 }
768 let base_config = parse_protobuf_file_scan_config(
769 base_conf,
770 registry,
771 extension_codec,
772 Arc::new(source),
773 )?;
774 Ok(DataSourceExec::from_data_source(base_config))
775 }
776 #[cfg(not(feature = "parquet"))]
777 panic!("Unable to process a Parquet PhysicalPlan when `parquet` feature is not enabled")
778 }
779
780 #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
781 fn try_into_avro_scan_physical_plan(
782 &self,
783 scan: &protobuf::AvroScanExecNode,
784 registry: &dyn FunctionRegistry,
785 _runtime: &RuntimeEnv,
786 extension_codec: &dyn PhysicalExtensionCodec,
787 ) -> Result<Arc<dyn ExecutionPlan>> {
788 #[cfg(feature = "avro")]
789 {
790 let conf = parse_protobuf_file_scan_config(
791 scan.base_conf.as_ref().unwrap(),
792 registry,
793 extension_codec,
794 Arc::new(AvroSource::new()),
795 )?;
796 Ok(DataSourceExec::from_data_source(conf))
797 }
798 #[cfg(not(feature = "avro"))]
799 panic!("Unable to process a Avro PhysicalPlan when `avro` feature is not enabled")
800 }
801
802 fn try_into_coalesce_batches_physical_plan(
803 &self,
804 coalesce_batches: &protobuf::CoalesceBatchesExecNode,
805 registry: &dyn FunctionRegistry,
806 runtime: &RuntimeEnv,
807 extension_codec: &dyn PhysicalExtensionCodec,
808 ) -> Result<Arc<dyn ExecutionPlan>> {
809 let input: Arc<dyn ExecutionPlan> = into_physical_plan(
810 &coalesce_batches.input,
811 registry,
812 runtime,
813 extension_codec,
814 )?;
815 Ok(Arc::new(
816 CoalesceBatchesExec::new(input, coalesce_batches.target_batch_size as usize)
817 .with_fetch(coalesce_batches.fetch.map(|f| f as usize)),
818 ))
819 }
820
821 fn try_into_merge_physical_plan(
822 &self,
823 merge: &protobuf::CoalescePartitionsExecNode,
824 registry: &dyn FunctionRegistry,
825 runtime: &RuntimeEnv,
826 extension_codec: &dyn PhysicalExtensionCodec,
827 ) -> Result<Arc<dyn ExecutionPlan>> {
828 let input: Arc<dyn ExecutionPlan> =
829 into_physical_plan(&merge.input, registry, runtime, extension_codec)?;
830 Ok(Arc::new(
831 CoalescePartitionsExec::new(input)
832 .with_fetch(merge.fetch.map(|f| f as usize)),
833 ))
834 }
835
836 fn try_into_repartition_physical_plan(
837 &self,
838 repart: &protobuf::RepartitionExecNode,
839 registry: &dyn FunctionRegistry,
840 runtime: &RuntimeEnv,
841 extension_codec: &dyn PhysicalExtensionCodec,
842 ) -> Result<Arc<dyn ExecutionPlan>> {
843 let input: Arc<dyn ExecutionPlan> =
844 into_physical_plan(&repart.input, registry, runtime, extension_codec)?;
845 let partitioning = parse_protobuf_partitioning(
846 repart.partitioning.as_ref(),
847 registry,
848 input.schema().as_ref(),
849 extension_codec,
850 )?;
851 Ok(Arc::new(RepartitionExec::try_new(
852 input,
853 partitioning.unwrap(),
854 )?))
855 }
856
857 fn try_into_global_limit_physical_plan(
858 &self,
859 limit: &protobuf::GlobalLimitExecNode,
860 registry: &dyn FunctionRegistry,
861 runtime: &RuntimeEnv,
862 extension_codec: &dyn PhysicalExtensionCodec,
863 ) -> Result<Arc<dyn ExecutionPlan>> {
864 let input: Arc<dyn ExecutionPlan> =
865 into_physical_plan(&limit.input, registry, runtime, extension_codec)?;
866 let fetch = if limit.fetch >= 0 {
867 Some(limit.fetch as usize)
868 } else {
869 None
870 };
871 Ok(Arc::new(GlobalLimitExec::new(
872 input,
873 limit.skip as usize,
874 fetch,
875 )))
876 }
877
878 fn try_into_local_limit_physical_plan(
879 &self,
880 limit: &protobuf::LocalLimitExecNode,
881 registry: &dyn FunctionRegistry,
882 runtime: &RuntimeEnv,
883 extension_codec: &dyn PhysicalExtensionCodec,
884 ) -> Result<Arc<dyn ExecutionPlan>> {
885 let input: Arc<dyn ExecutionPlan> =
886 into_physical_plan(&limit.input, registry, runtime, extension_codec)?;
887 Ok(Arc::new(LocalLimitExec::new(input, limit.fetch as usize)))
888 }
889
890 fn try_into_window_physical_plan(
891 &self,
892 window_agg: &protobuf::WindowAggExecNode,
893 registry: &dyn FunctionRegistry,
894 runtime: &RuntimeEnv,
895 extension_codec: &dyn PhysicalExtensionCodec,
896 ) -> Result<Arc<dyn ExecutionPlan>> {
897 let input: Arc<dyn ExecutionPlan> =
898 into_physical_plan(&window_agg.input, registry, runtime, extension_codec)?;
899 let input_schema = input.schema();
900
901 let physical_window_expr: Vec<Arc<dyn WindowExpr>> = window_agg
902 .window_expr
903 .iter()
904 .map(|window_expr| {
905 parse_physical_window_expr(
906 window_expr,
907 registry,
908 input_schema.as_ref(),
909 extension_codec,
910 )
911 })
912 .collect::<Result<Vec<_>, _>>()?;
913
914 let partition_keys = window_agg
915 .partition_keys
916 .iter()
917 .map(|expr| {
918 parse_physical_expr(
919 expr,
920 registry,
921 input.schema().as_ref(),
922 extension_codec,
923 )
924 })
925 .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
926
927 if let Some(input_order_mode) = window_agg.input_order_mode.as_ref() {
928 let input_order_mode = match input_order_mode {
929 window_agg_exec_node::InputOrderMode::Linear(_) => InputOrderMode::Linear,
930 window_agg_exec_node::InputOrderMode::PartiallySorted(
931 protobuf::PartiallySortedInputOrderMode { columns },
932 ) => InputOrderMode::PartiallySorted(
933 columns.iter().map(|c| *c as usize).collect(),
934 ),
935 window_agg_exec_node::InputOrderMode::Sorted(_) => InputOrderMode::Sorted,
936 };
937
938 Ok(Arc::new(BoundedWindowAggExec::try_new(
939 physical_window_expr,
940 input,
941 input_order_mode,
942 !partition_keys.is_empty(),
943 )?))
944 } else {
945 Ok(Arc::new(WindowAggExec::try_new(
946 physical_window_expr,
947 input,
948 !partition_keys.is_empty(),
949 )?))
950 }
951 }
952
953 fn try_into_aggregate_physical_plan(
954 &self,
955 hash_agg: &protobuf::AggregateExecNode,
956 registry: &dyn FunctionRegistry,
957 runtime: &RuntimeEnv,
958 extension_codec: &dyn PhysicalExtensionCodec,
959 ) -> Result<Arc<dyn ExecutionPlan>> {
960 let input: Arc<dyn ExecutionPlan> =
961 into_physical_plan(&hash_agg.input, registry, runtime, extension_codec)?;
962 let mode = protobuf::AggregateMode::try_from(hash_agg.mode).map_err(|_| {
963 proto_error(format!(
964 "Received a AggregateNode message with unknown AggregateMode {}",
965 hash_agg.mode
966 ))
967 })?;
968 let agg_mode: AggregateMode = match mode {
969 protobuf::AggregateMode::Partial => AggregateMode::Partial,
970 protobuf::AggregateMode::Final => AggregateMode::Final,
971 protobuf::AggregateMode::FinalPartitioned => AggregateMode::FinalPartitioned,
972 protobuf::AggregateMode::Single => AggregateMode::Single,
973 protobuf::AggregateMode::SinglePartitioned => {
974 AggregateMode::SinglePartitioned
975 }
976 };
977
978 let num_expr = hash_agg.group_expr.len();
979
980 let group_expr = hash_agg
981 .group_expr
982 .iter()
983 .zip(hash_agg.group_expr_name.iter())
984 .map(|(expr, name)| {
985 parse_physical_expr(
986 expr,
987 registry,
988 input.schema().as_ref(),
989 extension_codec,
990 )
991 .map(|expr| (expr, name.to_string()))
992 })
993 .collect::<Result<Vec<_>, _>>()?;
994
995 let null_expr = hash_agg
996 .null_expr
997 .iter()
998 .zip(hash_agg.group_expr_name.iter())
999 .map(|(expr, name)| {
1000 parse_physical_expr(
1001 expr,
1002 registry,
1003 input.schema().as_ref(),
1004 extension_codec,
1005 )
1006 .map(|expr| (expr, name.to_string()))
1007 })
1008 .collect::<Result<Vec<_>, _>>()?;
1009
1010 let groups: Vec<Vec<bool>> = if !hash_agg.groups.is_empty() {
1011 hash_agg
1012 .groups
1013 .chunks(num_expr)
1014 .map(|g| g.to_vec())
1015 .collect::<Vec<Vec<bool>>>()
1016 } else {
1017 vec![]
1018 };
1019
1020 let input_schema = hash_agg.input_schema.as_ref().ok_or_else(|| {
1021 DataFusionError::Internal(
1022 "input_schema in AggregateNode is missing.".to_owned(),
1023 )
1024 })?;
1025 let physical_schema: SchemaRef = SchemaRef::new(input_schema.try_into()?);
1026
1027 let physical_filter_expr = hash_agg
1028 .filter_expr
1029 .iter()
1030 .map(|expr| {
1031 expr.expr
1032 .as_ref()
1033 .map(|e| {
1034 parse_physical_expr(
1035 e,
1036 registry,
1037 &physical_schema,
1038 extension_codec,
1039 )
1040 })
1041 .transpose()
1042 })
1043 .collect::<Result<Vec<_>, _>>()?;
1044
1045 let physical_aggr_expr: Vec<Arc<AggregateFunctionExpr>> = hash_agg
1046 .aggr_expr
1047 .iter()
1048 .zip(hash_agg.aggr_expr_name.iter())
1049 .map(|(expr, name)| {
1050 let expr_type = expr.expr_type.as_ref().ok_or_else(|| {
1051 proto_error("Unexpected empty aggregate physical expression")
1052 })?;
1053
1054 match expr_type {
1055 ExprType::AggregateExpr(agg_node) => {
1056 let input_phy_expr: Vec<Arc<dyn PhysicalExpr>> = agg_node
1057 .expr
1058 .iter()
1059 .map(|e| {
1060 parse_physical_expr(
1061 e,
1062 registry,
1063 &physical_schema,
1064 extension_codec,
1065 )
1066 })
1067 .collect::<Result<Vec<_>>>()?;
1068 let order_bys = agg_node
1069 .ordering_req
1070 .iter()
1071 .map(|e| {
1072 parse_physical_sort_expr(
1073 e,
1074 registry,
1075 &physical_schema,
1076 extension_codec,
1077 )
1078 })
1079 .collect::<Result<_>>()?;
1080 agg_node
1081 .aggregate_function
1082 .as_ref()
1083 .map(|func| match func {
1084 AggregateFunction::UserDefinedAggrFunction(udaf_name) => {
1085 let agg_udf = match &agg_node.fun_definition {
1086 Some(buf) => extension_codec
1087 .try_decode_udaf(udaf_name, buf)?,
1088 None => {
1089 registry.udaf(udaf_name).or_else(|_| {
1090 extension_codec
1091 .try_decode_udaf(udaf_name, &[])
1092 })?
1093 }
1094 };
1095
1096 AggregateExprBuilder::new(agg_udf, input_phy_expr)
1097 .schema(Arc::clone(&physical_schema))
1098 .alias(name)
1099 .with_ignore_nulls(agg_node.ignore_nulls)
1100 .with_distinct(agg_node.distinct)
1101 .order_by(order_bys)
1102 .build()
1103 .map(Arc::new)
1104 }
1105 })
1106 .transpose()?
1107 .ok_or_else(|| {
1108 proto_error(
1109 "Invalid AggregateExpr, missing aggregate_function",
1110 )
1111 })
1112 }
1113 _ => internal_err!("Invalid aggregate expression for AggregateExec"),
1114 }
1115 })
1116 .collect::<Result<Vec<_>, _>>()?;
1117
1118 let limit = hash_agg
1119 .limit
1120 .as_ref()
1121 .map(|lit_value| lit_value.limit as usize);
1122
1123 let agg = AggregateExec::try_new(
1124 agg_mode,
1125 PhysicalGroupBy::new(group_expr, null_expr, groups),
1126 physical_aggr_expr,
1127 physical_filter_expr,
1128 input,
1129 physical_schema,
1130 )?;
1131
1132 let agg = agg.with_limit(limit);
1133
1134 Ok(Arc::new(agg))
1135 }
1136
1137 fn try_into_hash_join_physical_plan(
1138 &self,
1139 hashjoin: &protobuf::HashJoinExecNode,
1140 registry: &dyn FunctionRegistry,
1141 runtime: &RuntimeEnv,
1142 extension_codec: &dyn PhysicalExtensionCodec,
1143 ) -> Result<Arc<dyn ExecutionPlan>> {
1144 let left: Arc<dyn ExecutionPlan> =
1145 into_physical_plan(&hashjoin.left, registry, runtime, extension_codec)?;
1146 let right: Arc<dyn ExecutionPlan> =
1147 into_physical_plan(&hashjoin.right, registry, runtime, extension_codec)?;
1148 let left_schema = left.schema();
1149 let right_schema = right.schema();
1150 let on: Vec<(PhysicalExprRef, PhysicalExprRef)> = hashjoin
1151 .on
1152 .iter()
1153 .map(|col| {
1154 let left = parse_physical_expr(
1155 &col.left.clone().unwrap(),
1156 registry,
1157 left_schema.as_ref(),
1158 extension_codec,
1159 )?;
1160 let right = parse_physical_expr(
1161 &col.right.clone().unwrap(),
1162 registry,
1163 right_schema.as_ref(),
1164 extension_codec,
1165 )?;
1166 Ok((left, right))
1167 })
1168 .collect::<Result<_>>()?;
1169 let join_type =
1170 protobuf::JoinType::try_from(hashjoin.join_type).map_err(|_| {
1171 proto_error(format!(
1172 "Received a HashJoinNode message with unknown JoinType {}",
1173 hashjoin.join_type
1174 ))
1175 })?;
1176 let null_equality = protobuf::NullEquality::try_from(hashjoin.null_equality)
1177 .map_err(|_| {
1178 proto_error(format!(
1179 "Received a HashJoinNode message with unknown NullEquality {}",
1180 hashjoin.null_equality
1181 ))
1182 })?;
1183 let filter = hashjoin
1184 .filter
1185 .as_ref()
1186 .map(|f| {
1187 let schema = f
1188 .schema
1189 .as_ref()
1190 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1191 .try_into()?;
1192
1193 let expression = parse_physical_expr(
1194 f.expression.as_ref().ok_or_else(|| {
1195 proto_error("Unexpected empty filter expression")
1196 })?,
1197 registry, &schema,
1198 extension_codec,
1199 )?;
1200 let column_indices = f.column_indices
1201 .iter()
1202 .map(|i| {
1203 let side = protobuf::JoinSide::try_from(i.side)
1204 .map_err(|_| proto_error(format!(
1205 "Received a HashJoinNode message with JoinSide in Filter {}",
1206 i.side))
1207 )?;
1208
1209 Ok(ColumnIndex {
1210 index: i.index as usize,
1211 side: side.into(),
1212 })
1213 })
1214 .collect::<Result<Vec<_>>>()?;
1215
1216 Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1217 })
1218 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1219
1220 let partition_mode = protobuf::PartitionMode::try_from(hashjoin.partition_mode)
1221 .map_err(|_| {
1222 proto_error(format!(
1223 "Received a HashJoinNode message with unknown PartitionMode {}",
1224 hashjoin.partition_mode
1225 ))
1226 })?;
1227 let partition_mode = match partition_mode {
1228 protobuf::PartitionMode::CollectLeft => PartitionMode::CollectLeft,
1229 protobuf::PartitionMode::Partitioned => PartitionMode::Partitioned,
1230 protobuf::PartitionMode::Auto => PartitionMode::Auto,
1231 };
1232 let projection = if !hashjoin.projection.is_empty() {
1233 Some(
1234 hashjoin
1235 .projection
1236 .iter()
1237 .map(|i| *i as usize)
1238 .collect::<Vec<_>>(),
1239 )
1240 } else {
1241 None
1242 };
1243 Ok(Arc::new(HashJoinExec::try_new(
1244 left,
1245 right,
1246 on,
1247 filter,
1248 &join_type.into(),
1249 projection,
1250 partition_mode,
1251 null_equality.into(),
1252 )?))
1253 }
1254
1255 fn try_into_symmetric_hash_join_physical_plan(
1256 &self,
1257 sym_join: &protobuf::SymmetricHashJoinExecNode,
1258 registry: &dyn FunctionRegistry,
1259 runtime: &RuntimeEnv,
1260 extension_codec: &dyn PhysicalExtensionCodec,
1261 ) -> Result<Arc<dyn ExecutionPlan>> {
1262 let left =
1263 into_physical_plan(&sym_join.left, registry, runtime, extension_codec)?;
1264 let right =
1265 into_physical_plan(&sym_join.right, registry, runtime, extension_codec)?;
1266 let left_schema = left.schema();
1267 let right_schema = right.schema();
1268 let on = sym_join
1269 .on
1270 .iter()
1271 .map(|col| {
1272 let left = parse_physical_expr(
1273 &col.left.clone().unwrap(),
1274 registry,
1275 left_schema.as_ref(),
1276 extension_codec,
1277 )?;
1278 let right = parse_physical_expr(
1279 &col.right.clone().unwrap(),
1280 registry,
1281 right_schema.as_ref(),
1282 extension_codec,
1283 )?;
1284 Ok((left, right))
1285 })
1286 .collect::<Result<_>>()?;
1287 let join_type =
1288 protobuf::JoinType::try_from(sym_join.join_type).map_err(|_| {
1289 proto_error(format!(
1290 "Received a SymmetricHashJoin message with unknown JoinType {}",
1291 sym_join.join_type
1292 ))
1293 })?;
1294 let null_equality = protobuf::NullEquality::try_from(sym_join.null_equality)
1295 .map_err(|_| {
1296 proto_error(format!(
1297 "Received a SymmetricHashJoin message with unknown NullEquality {}",
1298 sym_join.null_equality
1299 ))
1300 })?;
1301 let filter = sym_join
1302 .filter
1303 .as_ref()
1304 .map(|f| {
1305 let schema = f
1306 .schema
1307 .as_ref()
1308 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1309 .try_into()?;
1310
1311 let expression = parse_physical_expr(
1312 f.expression.as_ref().ok_or_else(|| {
1313 proto_error("Unexpected empty filter expression")
1314 })?,
1315 registry, &schema,
1316 extension_codec,
1317 )?;
1318 let column_indices = f.column_indices
1319 .iter()
1320 .map(|i| {
1321 let side = protobuf::JoinSide::try_from(i.side)
1322 .map_err(|_| proto_error(format!(
1323 "Received a HashJoinNode message with JoinSide in Filter {}",
1324 i.side))
1325 )?;
1326
1327 Ok(ColumnIndex {
1328 index: i.index as usize,
1329 side: side.into(),
1330 })
1331 })
1332 .collect::<Result<_>>()?;
1333
1334 Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1335 })
1336 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1337
1338 let left_sort_exprs = parse_physical_sort_exprs(
1339 &sym_join.left_sort_exprs,
1340 registry,
1341 &left_schema,
1342 extension_codec,
1343 )?;
1344 let left_sort_exprs = LexOrdering::new(left_sort_exprs);
1345
1346 let right_sort_exprs = parse_physical_sort_exprs(
1347 &sym_join.right_sort_exprs,
1348 registry,
1349 &right_schema,
1350 extension_codec,
1351 )?;
1352 let right_sort_exprs = LexOrdering::new(right_sort_exprs);
1353
1354 let partition_mode = protobuf::StreamPartitionMode::try_from(
1355 sym_join.partition_mode,
1356 )
1357 .map_err(|_| {
1358 proto_error(format!(
1359 "Received a SymmetricHashJoin message with unknown PartitionMode {}",
1360 sym_join.partition_mode
1361 ))
1362 })?;
1363 let partition_mode = match partition_mode {
1364 protobuf::StreamPartitionMode::SinglePartition => {
1365 StreamJoinPartitionMode::SinglePartition
1366 }
1367 protobuf::StreamPartitionMode::PartitionedExec => {
1368 StreamJoinPartitionMode::Partitioned
1369 }
1370 };
1371 SymmetricHashJoinExec::try_new(
1372 left,
1373 right,
1374 on,
1375 filter,
1376 &join_type.into(),
1377 null_equality.into(),
1378 left_sort_exprs,
1379 right_sort_exprs,
1380 partition_mode,
1381 )
1382 .map(|e| Arc::new(e) as _)
1383 }
1384
1385 fn try_into_union_physical_plan(
1386 &self,
1387 union: &protobuf::UnionExecNode,
1388 registry: &dyn FunctionRegistry,
1389 runtime: &RuntimeEnv,
1390 extension_codec: &dyn PhysicalExtensionCodec,
1391 ) -> Result<Arc<dyn ExecutionPlan>> {
1392 let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1393 for input in &union.inputs {
1394 inputs.push(input.try_into_physical_plan(
1395 registry,
1396 runtime,
1397 extension_codec,
1398 )?);
1399 }
1400 Ok(Arc::new(UnionExec::new(inputs)))
1401 }
1402
1403 fn try_into_interleave_physical_plan(
1404 &self,
1405 interleave: &protobuf::InterleaveExecNode,
1406 registry: &dyn FunctionRegistry,
1407 runtime: &RuntimeEnv,
1408 extension_codec: &dyn PhysicalExtensionCodec,
1409 ) -> Result<Arc<dyn ExecutionPlan>> {
1410 let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1411 for input in &interleave.inputs {
1412 inputs.push(input.try_into_physical_plan(
1413 registry,
1414 runtime,
1415 extension_codec,
1416 )?);
1417 }
1418 Ok(Arc::new(InterleaveExec::try_new(inputs)?))
1419 }
1420
1421 fn try_into_cross_join_physical_plan(
1422 &self,
1423 crossjoin: &protobuf::CrossJoinExecNode,
1424 registry: &dyn FunctionRegistry,
1425 runtime: &RuntimeEnv,
1426 extension_codec: &dyn PhysicalExtensionCodec,
1427 ) -> Result<Arc<dyn ExecutionPlan>> {
1428 let left: Arc<dyn ExecutionPlan> =
1429 into_physical_plan(&crossjoin.left, registry, runtime, extension_codec)?;
1430 let right: Arc<dyn ExecutionPlan> =
1431 into_physical_plan(&crossjoin.right, registry, runtime, extension_codec)?;
1432 Ok(Arc::new(CrossJoinExec::new(left, right)))
1433 }
1434
1435 fn try_into_empty_physical_plan(
1436 &self,
1437 empty: &protobuf::EmptyExecNode,
1438 _registry: &dyn FunctionRegistry,
1439 _runtime: &RuntimeEnv,
1440 _extension_codec: &dyn PhysicalExtensionCodec,
1441 ) -> Result<Arc<dyn ExecutionPlan>> {
1442 let schema = Arc::new(convert_required!(empty.schema)?);
1443 Ok(Arc::new(EmptyExec::new(schema)))
1444 }
1445
1446 fn try_into_placeholder_row_physical_plan(
1447 &self,
1448 placeholder: &protobuf::PlaceholderRowExecNode,
1449 _registry: &dyn FunctionRegistry,
1450 _runtime: &RuntimeEnv,
1451 _extension_codec: &dyn PhysicalExtensionCodec,
1452 ) -> Result<Arc<dyn ExecutionPlan>> {
1453 let schema = Arc::new(convert_required!(placeholder.schema)?);
1454 Ok(Arc::new(PlaceholderRowExec::new(schema)))
1455 }
1456
1457 fn try_into_sort_physical_plan(
1458 &self,
1459 sort: &protobuf::SortExecNode,
1460 registry: &dyn FunctionRegistry,
1461 runtime: &RuntimeEnv,
1462 extension_codec: &dyn PhysicalExtensionCodec,
1463 ) -> Result<Arc<dyn ExecutionPlan>> {
1464 let input = into_physical_plan(&sort.input, registry, runtime, extension_codec)?;
1465 let exprs = sort
1466 .expr
1467 .iter()
1468 .map(|expr| {
1469 let expr = expr.expr_type.as_ref().ok_or_else(|| {
1470 proto_error(format!(
1471 "physical_plan::from_proto() Unexpected expr {self:?}"
1472 ))
1473 })?;
1474 if let ExprType::Sort(sort_expr) = expr {
1475 let expr = sort_expr
1476 .expr
1477 .as_ref()
1478 .ok_or_else(|| {
1479 proto_error(format!(
1480 "physical_plan::from_proto() Unexpected sort expr {self:?}"
1481 ))
1482 })?
1483 .as_ref();
1484 Ok(PhysicalSortExpr {
1485 expr: parse_physical_expr(expr, registry, input.schema().as_ref(), extension_codec)?,
1486 options: SortOptions {
1487 descending: !sort_expr.asc,
1488 nulls_first: sort_expr.nulls_first,
1489 },
1490 })
1491 } else {
1492 internal_err!(
1493 "physical_plan::from_proto() {self:?}"
1494 )
1495 }
1496 })
1497 .collect::<Result<Vec<_>>>()?;
1498 let Some(ordering) = LexOrdering::new(exprs) else {
1499 return internal_err!("SortExec requires an ordering");
1500 };
1501 let fetch = (sort.fetch >= 0).then_some(sort.fetch as _);
1502 let new_sort = SortExec::new(ordering, input)
1503 .with_fetch(fetch)
1504 .with_preserve_partitioning(sort.preserve_partitioning);
1505
1506 Ok(Arc::new(new_sort))
1507 }
1508
1509 fn try_into_sort_preserving_merge_physical_plan(
1510 &self,
1511 sort: &protobuf::SortPreservingMergeExecNode,
1512 registry: &dyn FunctionRegistry,
1513 runtime: &RuntimeEnv,
1514 extension_codec: &dyn PhysicalExtensionCodec,
1515 ) -> Result<Arc<dyn ExecutionPlan>> {
1516 let input = into_physical_plan(&sort.input, registry, runtime, extension_codec)?;
1517 let exprs = sort
1518 .expr
1519 .iter()
1520 .map(|expr| {
1521 let expr = expr.expr_type.as_ref().ok_or_else(|| {
1522 proto_error(format!(
1523 "physical_plan::from_proto() Unexpected expr {self:?}"
1524 ))
1525 })?;
1526 if let ExprType::Sort(sort_expr) = expr {
1527 let expr = sort_expr
1528 .expr
1529 .as_ref()
1530 .ok_or_else(|| {
1531 proto_error(format!(
1532 "physical_plan::from_proto() Unexpected sort expr {self:?}"
1533 ))
1534 })?
1535 .as_ref();
1536 Ok(PhysicalSortExpr {
1537 expr: parse_physical_expr(
1538 expr,
1539 registry,
1540 input.schema().as_ref(),
1541 extension_codec,
1542 )?,
1543 options: SortOptions {
1544 descending: !sort_expr.asc,
1545 nulls_first: sort_expr.nulls_first,
1546 },
1547 })
1548 } else {
1549 internal_err!("physical_plan::from_proto() {self:?}")
1550 }
1551 })
1552 .collect::<Result<Vec<_>>>()?;
1553 let Some(ordering) = LexOrdering::new(exprs) else {
1554 return internal_err!("SortExec requires an ordering");
1555 };
1556 let fetch = (sort.fetch >= 0).then_some(sort.fetch as _);
1557 Ok(Arc::new(
1558 SortPreservingMergeExec::new(ordering, input).with_fetch(fetch),
1559 ))
1560 }
1561
1562 fn try_into_extension_physical_plan(
1563 &self,
1564 extension: &protobuf::PhysicalExtensionNode,
1565 registry: &dyn FunctionRegistry,
1566 runtime: &RuntimeEnv,
1567 extension_codec: &dyn PhysicalExtensionCodec,
1568 ) -> Result<Arc<dyn ExecutionPlan>> {
1569 let inputs: Vec<Arc<dyn ExecutionPlan>> = extension
1570 .inputs
1571 .iter()
1572 .map(|i| i.try_into_physical_plan(registry, runtime, extension_codec))
1573 .collect::<Result<_>>()?;
1574
1575 let extension_node =
1576 extension_codec.try_decode(extension.node.as_slice(), &inputs, registry)?;
1577
1578 Ok(extension_node)
1579 }
1580
1581 fn try_into_nested_loop_join_physical_plan(
1582 &self,
1583 join: &protobuf::NestedLoopJoinExecNode,
1584 registry: &dyn FunctionRegistry,
1585 runtime: &RuntimeEnv,
1586 extension_codec: &dyn PhysicalExtensionCodec,
1587 ) -> Result<Arc<dyn ExecutionPlan>> {
1588 let left: Arc<dyn ExecutionPlan> =
1589 into_physical_plan(&join.left, registry, runtime, extension_codec)?;
1590 let right: Arc<dyn ExecutionPlan> =
1591 into_physical_plan(&join.right, registry, runtime, extension_codec)?;
1592 let join_type = protobuf::JoinType::try_from(join.join_type).map_err(|_| {
1593 proto_error(format!(
1594 "Received a NestedLoopJoinExecNode message with unknown JoinType {}",
1595 join.join_type
1596 ))
1597 })?;
1598 let filter = join
1599 .filter
1600 .as_ref()
1601 .map(|f| {
1602 let schema = f
1603 .schema
1604 .as_ref()
1605 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1606 .try_into()?;
1607
1608 let expression = parse_physical_expr(
1609 f.expression.as_ref().ok_or_else(|| {
1610 proto_error("Unexpected empty filter expression")
1611 })?,
1612 registry, &schema,
1613 extension_codec,
1614 )?;
1615 let column_indices = f.column_indices
1616 .iter()
1617 .map(|i| {
1618 let side = protobuf::JoinSide::try_from(i.side)
1619 .map_err(|_| proto_error(format!(
1620 "Received a NestedLoopJoinExecNode message with JoinSide in Filter {}",
1621 i.side))
1622 )?;
1623
1624 Ok(ColumnIndex {
1625 index: i.index as usize,
1626 side: side.into(),
1627 })
1628 })
1629 .collect::<Result<Vec<_>>>()?;
1630
1631 Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1632 })
1633 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1634
1635 let projection = if !join.projection.is_empty() {
1636 Some(
1637 join.projection
1638 .iter()
1639 .map(|i| *i as usize)
1640 .collect::<Vec<_>>(),
1641 )
1642 } else {
1643 None
1644 };
1645
1646 Ok(Arc::new(NestedLoopJoinExec::try_new(
1647 left,
1648 right,
1649 filter,
1650 &join_type.into(),
1651 projection,
1652 )?))
1653 }
1654
1655 fn try_into_analyze_physical_plan(
1656 &self,
1657 analyze: &protobuf::AnalyzeExecNode,
1658 registry: &dyn FunctionRegistry,
1659 runtime: &RuntimeEnv,
1660 extension_codec: &dyn PhysicalExtensionCodec,
1661 ) -> Result<Arc<dyn ExecutionPlan>> {
1662 let input: Arc<dyn ExecutionPlan> =
1663 into_physical_plan(&analyze.input, registry, runtime, extension_codec)?;
1664 Ok(Arc::new(AnalyzeExec::new(
1665 analyze.verbose,
1666 analyze.show_statistics,
1667 input,
1668 Arc::new(convert_required!(analyze.schema)?),
1669 )))
1670 }
1671
1672 fn try_into_json_sink_physical_plan(
1673 &self,
1674 sink: &protobuf::JsonSinkExecNode,
1675 registry: &dyn FunctionRegistry,
1676 runtime: &RuntimeEnv,
1677 extension_codec: &dyn PhysicalExtensionCodec,
1678 ) -> Result<Arc<dyn ExecutionPlan>> {
1679 let input = into_physical_plan(&sink.input, registry, runtime, extension_codec)?;
1680
1681 let data_sink: JsonSink = sink
1682 .sink
1683 .as_ref()
1684 .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1685 .try_into()?;
1686 let sink_schema = input.schema();
1687 let sort_order = sink
1688 .sort_order
1689 .as_ref()
1690 .map(|collection| {
1691 parse_physical_sort_exprs(
1692 &collection.physical_sort_expr_nodes,
1693 registry,
1694 &sink_schema,
1695 extension_codec,
1696 )
1697 .map(|sort_exprs| {
1698 LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1699 })
1700 })
1701 .transpose()?
1702 .flatten();
1703 Ok(Arc::new(DataSinkExec::new(
1704 input,
1705 Arc::new(data_sink),
1706 sort_order,
1707 )))
1708 }
1709
1710 fn try_into_csv_sink_physical_plan(
1711 &self,
1712 sink: &protobuf::CsvSinkExecNode,
1713 registry: &dyn FunctionRegistry,
1714 runtime: &RuntimeEnv,
1715 extension_codec: &dyn PhysicalExtensionCodec,
1716 ) -> Result<Arc<dyn ExecutionPlan>> {
1717 let input = into_physical_plan(&sink.input, registry, runtime, extension_codec)?;
1718
1719 let data_sink: CsvSink = sink
1720 .sink
1721 .as_ref()
1722 .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1723 .try_into()?;
1724 let sink_schema = input.schema();
1725 let sort_order = sink
1726 .sort_order
1727 .as_ref()
1728 .map(|collection| {
1729 parse_physical_sort_exprs(
1730 &collection.physical_sort_expr_nodes,
1731 registry,
1732 &sink_schema,
1733 extension_codec,
1734 )
1735 .map(|sort_exprs| {
1736 LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1737 })
1738 })
1739 .transpose()?
1740 .flatten();
1741 Ok(Arc::new(DataSinkExec::new(
1742 input,
1743 Arc::new(data_sink),
1744 sort_order,
1745 )))
1746 }
1747
1748 fn try_into_parquet_sink_physical_plan(
1749 &self,
1750 sink: &protobuf::ParquetSinkExecNode,
1751 registry: &dyn FunctionRegistry,
1752 runtime: &RuntimeEnv,
1753 extension_codec: &dyn PhysicalExtensionCodec,
1754 ) -> Result<Arc<dyn ExecutionPlan>> {
1755 #[cfg(feature = "parquet")]
1756 {
1757 let input =
1758 into_physical_plan(&sink.input, registry, runtime, extension_codec)?;
1759
1760 let data_sink: ParquetSink = sink
1761 .sink
1762 .as_ref()
1763 .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1764 .try_into()?;
1765 let sink_schema = input.schema();
1766 let sort_order = sink
1767 .sort_order
1768 .as_ref()
1769 .map(|collection| {
1770 parse_physical_sort_exprs(
1771 &collection.physical_sort_expr_nodes,
1772 registry,
1773 &sink_schema,
1774 extension_codec,
1775 )
1776 .map(|sort_exprs| {
1777 LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1778 })
1779 })
1780 .transpose()?
1781 .flatten();
1782 Ok(Arc::new(DataSinkExec::new(
1783 input,
1784 Arc::new(data_sink),
1785 sort_order,
1786 )))
1787 }
1788 #[cfg(not(feature = "parquet"))]
1789 panic!("Trying to use ParquetSink without `parquet` feature enabled");
1790 }
1791
1792 fn try_into_unnest_physical_plan(
1793 &self,
1794 unnest: &protobuf::UnnestExecNode,
1795 registry: &dyn FunctionRegistry,
1796 runtime: &RuntimeEnv,
1797 extension_codec: &dyn PhysicalExtensionCodec,
1798 ) -> Result<Arc<dyn ExecutionPlan>> {
1799 let input =
1800 into_physical_plan(&unnest.input, registry, runtime, extension_codec)?;
1801
1802 Ok(Arc::new(UnnestExec::new(
1803 input,
1804 unnest
1805 .list_type_columns
1806 .iter()
1807 .map(|c| ListUnnest {
1808 index_in_input_schema: c.index_in_input_schema as _,
1809 depth: c.depth as _,
1810 })
1811 .collect(),
1812 unnest.struct_type_columns.iter().map(|c| *c as _).collect(),
1813 Arc::new(convert_required!(unnest.schema)?),
1814 into_required!(unnest.options)?,
1815 )))
1816 }
1817
1818 fn try_into_cooperative_physical_plan(
1819 &self,
1820 field_stream: &protobuf::CooperativeExecNode,
1821 registry: &dyn FunctionRegistry,
1822 runtime: &RuntimeEnv,
1823 extension_codec: &dyn PhysicalExtensionCodec,
1824 ) -> Result<Arc<dyn ExecutionPlan>> {
1825 let input =
1826 into_physical_plan(&field_stream.input, registry, runtime, extension_codec)?;
1827 Ok(Arc::new(CooperativeExec::new(input)))
1828 }
1829
1830 fn try_from_explain_exec(
1831 exec: &ExplainExec,
1832 _extension_codec: &dyn PhysicalExtensionCodec,
1833 ) -> Result<Self> {
1834 Ok(protobuf::PhysicalPlanNode {
1835 physical_plan_type: Some(PhysicalPlanType::Explain(
1836 protobuf::ExplainExecNode {
1837 schema: Some(exec.schema().as_ref().try_into()?),
1838 stringified_plans: exec
1839 .stringified_plans()
1840 .iter()
1841 .map(|plan| plan.into())
1842 .collect(),
1843 verbose: exec.verbose(),
1844 },
1845 )),
1846 })
1847 }
1848
1849 fn try_from_projection_exec(
1850 exec: &ProjectionExec,
1851 extension_codec: &dyn PhysicalExtensionCodec,
1852 ) -> Result<Self> {
1853 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1854 exec.input().to_owned(),
1855 extension_codec,
1856 )?;
1857 let expr = exec
1858 .expr()
1859 .iter()
1860 .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
1861 .collect::<Result<Vec<_>>>()?;
1862 let expr_name = exec.expr().iter().map(|expr| expr.1.clone()).collect();
1863 Ok(protobuf::PhysicalPlanNode {
1864 physical_plan_type: Some(PhysicalPlanType::Projection(Box::new(
1865 protobuf::ProjectionExecNode {
1866 input: Some(Box::new(input)),
1867 expr,
1868 expr_name,
1869 },
1870 ))),
1871 })
1872 }
1873
1874 fn try_from_analyze_exec(
1875 exec: &AnalyzeExec,
1876 extension_codec: &dyn PhysicalExtensionCodec,
1877 ) -> Result<Self> {
1878 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1879 exec.input().to_owned(),
1880 extension_codec,
1881 )?;
1882 Ok(protobuf::PhysicalPlanNode {
1883 physical_plan_type: Some(PhysicalPlanType::Analyze(Box::new(
1884 protobuf::AnalyzeExecNode {
1885 verbose: exec.verbose(),
1886 show_statistics: exec.show_statistics(),
1887 input: Some(Box::new(input)),
1888 schema: Some(exec.schema().as_ref().try_into()?),
1889 },
1890 ))),
1891 })
1892 }
1893
1894 fn try_from_filter_exec(
1895 exec: &FilterExec,
1896 extension_codec: &dyn PhysicalExtensionCodec,
1897 ) -> Result<Self> {
1898 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1899 exec.input().to_owned(),
1900 extension_codec,
1901 )?;
1902 Ok(protobuf::PhysicalPlanNode {
1903 physical_plan_type: Some(PhysicalPlanType::Filter(Box::new(
1904 protobuf::FilterExecNode {
1905 input: Some(Box::new(input)),
1906 expr: Some(serialize_physical_expr(
1907 exec.predicate(),
1908 extension_codec,
1909 )?),
1910 default_filter_selectivity: exec.default_selectivity() as u32,
1911 projection: exec.projection().as_ref().map_or_else(Vec::new, |v| {
1912 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
1913 }),
1914 },
1915 ))),
1916 })
1917 }
1918
1919 fn try_from_global_limit_exec(
1920 limit: &GlobalLimitExec,
1921 extension_codec: &dyn PhysicalExtensionCodec,
1922 ) -> Result<Self> {
1923 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1924 limit.input().to_owned(),
1925 extension_codec,
1926 )?;
1927
1928 Ok(protobuf::PhysicalPlanNode {
1929 physical_plan_type: Some(PhysicalPlanType::GlobalLimit(Box::new(
1930 protobuf::GlobalLimitExecNode {
1931 input: Some(Box::new(input)),
1932 skip: limit.skip() as u32,
1933 fetch: match limit.fetch() {
1934 Some(n) => n as i64,
1935 _ => -1, },
1937 },
1938 ))),
1939 })
1940 }
1941
1942 fn try_from_local_limit_exec(
1943 limit: &LocalLimitExec,
1944 extension_codec: &dyn PhysicalExtensionCodec,
1945 ) -> Result<Self> {
1946 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1947 limit.input().to_owned(),
1948 extension_codec,
1949 )?;
1950 Ok(protobuf::PhysicalPlanNode {
1951 physical_plan_type: Some(PhysicalPlanType::LocalLimit(Box::new(
1952 protobuf::LocalLimitExecNode {
1953 input: Some(Box::new(input)),
1954 fetch: limit.fetch() as u32,
1955 },
1956 ))),
1957 })
1958 }
1959
1960 fn try_from_hash_join_exec(
1961 exec: &HashJoinExec,
1962 extension_codec: &dyn PhysicalExtensionCodec,
1963 ) -> Result<Self> {
1964 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
1965 exec.left().to_owned(),
1966 extension_codec,
1967 )?;
1968 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
1969 exec.right().to_owned(),
1970 extension_codec,
1971 )?;
1972 let on: Vec<protobuf::JoinOn> = exec
1973 .on()
1974 .iter()
1975 .map(|tuple| {
1976 let l = serialize_physical_expr(&tuple.0, extension_codec)?;
1977 let r = serialize_physical_expr(&tuple.1, extension_codec)?;
1978 Ok::<_, DataFusionError>(protobuf::JoinOn {
1979 left: Some(l),
1980 right: Some(r),
1981 })
1982 })
1983 .collect::<Result<_>>()?;
1984 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
1985 let null_equality: protobuf::NullEquality = exec.null_equality().into();
1986 let filter = exec
1987 .filter()
1988 .as_ref()
1989 .map(|f| {
1990 let expression =
1991 serialize_physical_expr(f.expression(), extension_codec)?;
1992 let column_indices = f
1993 .column_indices()
1994 .iter()
1995 .map(|i| {
1996 let side: protobuf::JoinSide = i.side.to_owned().into();
1997 protobuf::ColumnIndex {
1998 index: i.index as u32,
1999 side: side.into(),
2000 }
2001 })
2002 .collect();
2003 let schema = f.schema().as_ref().try_into()?;
2004 Ok(protobuf::JoinFilter {
2005 expression: Some(expression),
2006 column_indices,
2007 schema: Some(schema),
2008 })
2009 })
2010 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2011
2012 let partition_mode = match exec.partition_mode() {
2013 PartitionMode::CollectLeft => protobuf::PartitionMode::CollectLeft,
2014 PartitionMode::Partitioned => protobuf::PartitionMode::Partitioned,
2015 PartitionMode::Auto => protobuf::PartitionMode::Auto,
2016 };
2017
2018 Ok(protobuf::PhysicalPlanNode {
2019 physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new(
2020 protobuf::HashJoinExecNode {
2021 left: Some(Box::new(left)),
2022 right: Some(Box::new(right)),
2023 on,
2024 join_type: join_type.into(),
2025 partition_mode: partition_mode.into(),
2026 null_equality: null_equality.into(),
2027 filter,
2028 projection: exec.projection.as_ref().map_or_else(Vec::new, |v| {
2029 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2030 }),
2031 },
2032 ))),
2033 })
2034 }
2035
2036 fn try_from_symmetric_hash_join_exec(
2037 exec: &SymmetricHashJoinExec,
2038 extension_codec: &dyn PhysicalExtensionCodec,
2039 ) -> Result<Self> {
2040 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2041 exec.left().to_owned(),
2042 extension_codec,
2043 )?;
2044 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2045 exec.right().to_owned(),
2046 extension_codec,
2047 )?;
2048 let on = exec
2049 .on()
2050 .iter()
2051 .map(|tuple| {
2052 let l = serialize_physical_expr(&tuple.0, extension_codec)?;
2053 let r = serialize_physical_expr(&tuple.1, extension_codec)?;
2054 Ok::<_, DataFusionError>(protobuf::JoinOn {
2055 left: Some(l),
2056 right: Some(r),
2057 })
2058 })
2059 .collect::<Result<_>>()?;
2060 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2061 let null_equality: protobuf::NullEquality = exec.null_equality().into();
2062 let filter = exec
2063 .filter()
2064 .as_ref()
2065 .map(|f| {
2066 let expression =
2067 serialize_physical_expr(f.expression(), extension_codec)?;
2068 let column_indices = f
2069 .column_indices()
2070 .iter()
2071 .map(|i| {
2072 let side: protobuf::JoinSide = i.side.to_owned().into();
2073 protobuf::ColumnIndex {
2074 index: i.index as u32,
2075 side: side.into(),
2076 }
2077 })
2078 .collect();
2079 let schema = f.schema().as_ref().try_into()?;
2080 Ok(protobuf::JoinFilter {
2081 expression: Some(expression),
2082 column_indices,
2083 schema: Some(schema),
2084 })
2085 })
2086 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2087
2088 let partition_mode = match exec.partition_mode() {
2089 StreamJoinPartitionMode::SinglePartition => {
2090 protobuf::StreamPartitionMode::SinglePartition
2091 }
2092 StreamJoinPartitionMode::Partitioned => {
2093 protobuf::StreamPartitionMode::PartitionedExec
2094 }
2095 };
2096
2097 let left_sort_exprs = exec
2098 .left_sort_exprs()
2099 .map(|exprs| {
2100 exprs
2101 .iter()
2102 .map(|expr| {
2103 Ok(protobuf::PhysicalSortExprNode {
2104 expr: Some(Box::new(serialize_physical_expr(
2105 &expr.expr,
2106 extension_codec,
2107 )?)),
2108 asc: !expr.options.descending,
2109 nulls_first: expr.options.nulls_first,
2110 })
2111 })
2112 .collect::<Result<Vec<_>>>()
2113 })
2114 .transpose()?
2115 .unwrap_or(vec![]);
2116
2117 let right_sort_exprs = exec
2118 .right_sort_exprs()
2119 .map(|exprs| {
2120 exprs
2121 .iter()
2122 .map(|expr| {
2123 Ok(protobuf::PhysicalSortExprNode {
2124 expr: Some(Box::new(serialize_physical_expr(
2125 &expr.expr,
2126 extension_codec,
2127 )?)),
2128 asc: !expr.options.descending,
2129 nulls_first: expr.options.nulls_first,
2130 })
2131 })
2132 .collect::<Result<Vec<_>>>()
2133 })
2134 .transpose()?
2135 .unwrap_or(vec![]);
2136
2137 Ok(protobuf::PhysicalPlanNode {
2138 physical_plan_type: Some(PhysicalPlanType::SymmetricHashJoin(Box::new(
2139 protobuf::SymmetricHashJoinExecNode {
2140 left: Some(Box::new(left)),
2141 right: Some(Box::new(right)),
2142 on,
2143 join_type: join_type.into(),
2144 partition_mode: partition_mode.into(),
2145 null_equality: null_equality.into(),
2146 left_sort_exprs,
2147 right_sort_exprs,
2148 filter,
2149 },
2150 ))),
2151 })
2152 }
2153
2154 fn try_from_cross_join_exec(
2155 exec: &CrossJoinExec,
2156 extension_codec: &dyn PhysicalExtensionCodec,
2157 ) -> Result<Self> {
2158 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2159 exec.left().to_owned(),
2160 extension_codec,
2161 )?;
2162 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2163 exec.right().to_owned(),
2164 extension_codec,
2165 )?;
2166 Ok(protobuf::PhysicalPlanNode {
2167 physical_plan_type: Some(PhysicalPlanType::CrossJoin(Box::new(
2168 protobuf::CrossJoinExecNode {
2169 left: Some(Box::new(left)),
2170 right: Some(Box::new(right)),
2171 },
2172 ))),
2173 })
2174 }
2175
2176 fn try_from_aggregate_exec(
2177 exec: &AggregateExec,
2178 extension_codec: &dyn PhysicalExtensionCodec,
2179 ) -> Result<Self> {
2180 let groups: Vec<bool> = exec
2181 .group_expr()
2182 .groups()
2183 .iter()
2184 .flatten()
2185 .copied()
2186 .collect();
2187
2188 let group_names = exec
2189 .group_expr()
2190 .expr()
2191 .iter()
2192 .map(|expr| expr.1.to_owned())
2193 .collect();
2194
2195 let filter = exec
2196 .filter_expr()
2197 .iter()
2198 .map(|expr| serialize_maybe_filter(expr.to_owned(), extension_codec))
2199 .collect::<Result<Vec<_>>>()?;
2200
2201 let agg = exec
2202 .aggr_expr()
2203 .iter()
2204 .map(|expr| serialize_physical_aggr_expr(expr.to_owned(), extension_codec))
2205 .collect::<Result<Vec<_>>>()?;
2206
2207 let agg_names = exec
2208 .aggr_expr()
2209 .iter()
2210 .map(|expr| expr.name().to_string())
2211 .collect::<Vec<_>>();
2212
2213 let agg_mode = match exec.mode() {
2214 AggregateMode::Partial => protobuf::AggregateMode::Partial,
2215 AggregateMode::Final => protobuf::AggregateMode::Final,
2216 AggregateMode::FinalPartitioned => protobuf::AggregateMode::FinalPartitioned,
2217 AggregateMode::Single => protobuf::AggregateMode::Single,
2218 AggregateMode::SinglePartitioned => {
2219 protobuf::AggregateMode::SinglePartitioned
2220 }
2221 };
2222 let input_schema = exec.input_schema();
2223 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2224 exec.input().to_owned(),
2225 extension_codec,
2226 )?;
2227
2228 let null_expr = exec
2229 .group_expr()
2230 .null_expr()
2231 .iter()
2232 .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
2233 .collect::<Result<Vec<_>>>()?;
2234
2235 let group_expr = exec
2236 .group_expr()
2237 .expr()
2238 .iter()
2239 .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
2240 .collect::<Result<Vec<_>>>()?;
2241
2242 let limit = exec.limit().map(|value| protobuf::AggLimit {
2243 limit: value as u64,
2244 });
2245
2246 Ok(protobuf::PhysicalPlanNode {
2247 physical_plan_type: Some(PhysicalPlanType::Aggregate(Box::new(
2248 protobuf::AggregateExecNode {
2249 group_expr,
2250 group_expr_name: group_names,
2251 aggr_expr: agg,
2252 filter_expr: filter,
2253 aggr_expr_name: agg_names,
2254 mode: agg_mode as i32,
2255 input: Some(Box::new(input)),
2256 input_schema: Some(input_schema.as_ref().try_into()?),
2257 null_expr,
2258 groups,
2259 limit,
2260 },
2261 ))),
2262 })
2263 }
2264
2265 fn try_from_empty_exec(
2266 empty: &EmptyExec,
2267 _extension_codec: &dyn PhysicalExtensionCodec,
2268 ) -> Result<Self> {
2269 let schema = empty.schema().as_ref().try_into()?;
2270 Ok(protobuf::PhysicalPlanNode {
2271 physical_plan_type: Some(PhysicalPlanType::Empty(protobuf::EmptyExecNode {
2272 schema: Some(schema),
2273 })),
2274 })
2275 }
2276
2277 fn try_from_placeholder_row_exec(
2278 empty: &PlaceholderRowExec,
2279 _extension_codec: &dyn PhysicalExtensionCodec,
2280 ) -> Result<Self> {
2281 let schema = empty.schema().as_ref().try_into()?;
2282 Ok(protobuf::PhysicalPlanNode {
2283 physical_plan_type: Some(PhysicalPlanType::PlaceholderRow(
2284 protobuf::PlaceholderRowExecNode {
2285 schema: Some(schema),
2286 },
2287 )),
2288 })
2289 }
2290
2291 fn try_from_coalesce_batches_exec(
2292 coalesce_batches: &CoalesceBatchesExec,
2293 extension_codec: &dyn PhysicalExtensionCodec,
2294 ) -> Result<Self> {
2295 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2296 coalesce_batches.input().to_owned(),
2297 extension_codec,
2298 )?;
2299 Ok(protobuf::PhysicalPlanNode {
2300 physical_plan_type: Some(PhysicalPlanType::CoalesceBatches(Box::new(
2301 protobuf::CoalesceBatchesExecNode {
2302 input: Some(Box::new(input)),
2303 target_batch_size: coalesce_batches.target_batch_size() as u32,
2304 fetch: coalesce_batches.fetch().map(|n| n as u32),
2305 },
2306 ))),
2307 })
2308 }
2309
2310 fn try_from_data_source_exec(
2311 data_source_exec: &DataSourceExec,
2312 extension_codec: &dyn PhysicalExtensionCodec,
2313 ) -> Result<Option<Self>> {
2314 let data_source = data_source_exec.data_source();
2315 if let Some(maybe_csv) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2316 let source = maybe_csv.file_source();
2317 if let Some(csv_config) = source.as_any().downcast_ref::<CsvSource>() {
2318 return Ok(Some(protobuf::PhysicalPlanNode {
2319 physical_plan_type: Some(PhysicalPlanType::CsvScan(
2320 protobuf::CsvScanExecNode {
2321 base_conf: Some(serialize_file_scan_config(
2322 maybe_csv,
2323 extension_codec,
2324 )?),
2325 has_header: csv_config.has_header(),
2326 delimiter: byte_to_string(
2327 csv_config.delimiter(),
2328 "delimiter",
2329 )?,
2330 quote: byte_to_string(csv_config.quote(), "quote")?,
2331 optional_escape: if let Some(escape) = csv_config.escape() {
2332 Some(
2333 protobuf::csv_scan_exec_node::OptionalEscape::Escape(
2334 byte_to_string(escape, "escape")?,
2335 ),
2336 )
2337 } else {
2338 None
2339 },
2340 optional_comment: if let Some(comment) = csv_config.comment()
2341 {
2342 Some(protobuf::csv_scan_exec_node::OptionalComment::Comment(
2343 byte_to_string(comment, "comment")?,
2344 ))
2345 } else {
2346 None
2347 },
2348 newlines_in_values: maybe_csv.newlines_in_values(),
2349 },
2350 )),
2351 }));
2352 }
2353 }
2354
2355 if let Some(scan_conf) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2356 let source = scan_conf.file_source();
2357 if let Some(_json_source) = source.as_any().downcast_ref::<JsonSource>() {
2358 return Ok(Some(protobuf::PhysicalPlanNode {
2359 physical_plan_type: Some(PhysicalPlanType::JsonScan(
2360 protobuf::JsonScanExecNode {
2361 base_conf: Some(serialize_file_scan_config(
2362 scan_conf,
2363 extension_codec,
2364 )?),
2365 },
2366 )),
2367 }));
2368 }
2369 }
2370
2371 #[cfg(feature = "parquet")]
2372 if let Some((maybe_parquet, conf)) =
2373 data_source_exec.downcast_to_file_source::<ParquetSource>()
2374 {
2375 let predicate = conf
2376 .predicate()
2377 .map(|pred| serialize_physical_expr(pred, extension_codec))
2378 .transpose()?;
2379 return Ok(Some(protobuf::PhysicalPlanNode {
2380 physical_plan_type: Some(PhysicalPlanType::ParquetScan(
2381 protobuf::ParquetScanExecNode {
2382 base_conf: Some(serialize_file_scan_config(
2383 maybe_parquet,
2384 extension_codec,
2385 )?),
2386 predicate,
2387 parquet_options: Some(conf.table_parquet_options().try_into()?),
2388 },
2389 )),
2390 }));
2391 }
2392
2393 #[cfg(feature = "avro")]
2394 if let Some(maybe_avro) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2395 let source = maybe_avro.file_source();
2396 if source.as_any().downcast_ref::<AvroSource>().is_some() {
2397 return Ok(Some(protobuf::PhysicalPlanNode {
2398 physical_plan_type: Some(PhysicalPlanType::AvroScan(
2399 protobuf::AvroScanExecNode {
2400 base_conf: Some(serialize_file_scan_config(
2401 maybe_avro,
2402 extension_codec,
2403 )?),
2404 },
2405 )),
2406 }));
2407 }
2408 }
2409
2410 Ok(None)
2411 }
2412
2413 fn try_from_coalesce_partitions_exec(
2414 exec: &CoalescePartitionsExec,
2415 extension_codec: &dyn PhysicalExtensionCodec,
2416 ) -> Result<Self> {
2417 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2418 exec.input().to_owned(),
2419 extension_codec,
2420 )?;
2421 Ok(protobuf::PhysicalPlanNode {
2422 physical_plan_type: Some(PhysicalPlanType::Merge(Box::new(
2423 protobuf::CoalescePartitionsExecNode {
2424 input: Some(Box::new(input)),
2425 fetch: exec.fetch().map(|f| f as u32),
2426 },
2427 ))),
2428 })
2429 }
2430
2431 fn try_from_repartition_exec(
2432 exec: &RepartitionExec,
2433 extension_codec: &dyn PhysicalExtensionCodec,
2434 ) -> Result<Self> {
2435 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2436 exec.input().to_owned(),
2437 extension_codec,
2438 )?;
2439
2440 let pb_partitioning =
2441 serialize_partitioning(exec.partitioning(), extension_codec)?;
2442
2443 Ok(protobuf::PhysicalPlanNode {
2444 physical_plan_type: Some(PhysicalPlanType::Repartition(Box::new(
2445 protobuf::RepartitionExecNode {
2446 input: Some(Box::new(input)),
2447 partitioning: Some(pb_partitioning),
2448 },
2449 ))),
2450 })
2451 }
2452
2453 fn try_from_sort_exec(
2454 exec: &SortExec,
2455 extension_codec: &dyn PhysicalExtensionCodec,
2456 ) -> Result<Self> {
2457 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2458 exec.input().to_owned(),
2459 extension_codec,
2460 )?;
2461 let expr = exec
2462 .expr()
2463 .iter()
2464 .map(|expr| {
2465 let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
2466 expr: Some(Box::new(serialize_physical_expr(
2467 &expr.expr,
2468 extension_codec,
2469 )?)),
2470 asc: !expr.options.descending,
2471 nulls_first: expr.options.nulls_first,
2472 });
2473 Ok(protobuf::PhysicalExprNode {
2474 expr_type: Some(ExprType::Sort(sort_expr)),
2475 })
2476 })
2477 .collect::<Result<Vec<_>>>()?;
2478 Ok(protobuf::PhysicalPlanNode {
2479 physical_plan_type: Some(PhysicalPlanType::Sort(Box::new(
2480 protobuf::SortExecNode {
2481 input: Some(Box::new(input)),
2482 expr,
2483 fetch: match exec.fetch() {
2484 Some(n) => n as i64,
2485 _ => -1,
2486 },
2487 preserve_partitioning: exec.preserve_partitioning(),
2488 },
2489 ))),
2490 })
2491 }
2492
2493 fn try_from_union_exec(
2494 union: &UnionExec,
2495 extension_codec: &dyn PhysicalExtensionCodec,
2496 ) -> Result<Self> {
2497 let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
2498 for input in union.inputs() {
2499 inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
2500 input.to_owned(),
2501 extension_codec,
2502 )?);
2503 }
2504 Ok(protobuf::PhysicalPlanNode {
2505 physical_plan_type: Some(PhysicalPlanType::Union(protobuf::UnionExecNode {
2506 inputs,
2507 })),
2508 })
2509 }
2510
2511 fn try_from_interleave_exec(
2512 interleave: &InterleaveExec,
2513 extension_codec: &dyn PhysicalExtensionCodec,
2514 ) -> Result<Self> {
2515 let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
2516 for input in interleave.inputs() {
2517 inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
2518 input.to_owned(),
2519 extension_codec,
2520 )?);
2521 }
2522 Ok(protobuf::PhysicalPlanNode {
2523 physical_plan_type: Some(PhysicalPlanType::Interleave(
2524 protobuf::InterleaveExecNode { inputs },
2525 )),
2526 })
2527 }
2528
2529 fn try_from_sort_preserving_merge_exec(
2530 exec: &SortPreservingMergeExec,
2531 extension_codec: &dyn PhysicalExtensionCodec,
2532 ) -> Result<Self> {
2533 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2534 exec.input().to_owned(),
2535 extension_codec,
2536 )?;
2537 let expr = exec
2538 .expr()
2539 .iter()
2540 .map(|expr| {
2541 let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
2542 expr: Some(Box::new(serialize_physical_expr(
2543 &expr.expr,
2544 extension_codec,
2545 )?)),
2546 asc: !expr.options.descending,
2547 nulls_first: expr.options.nulls_first,
2548 });
2549 Ok(protobuf::PhysicalExprNode {
2550 expr_type: Some(ExprType::Sort(sort_expr)),
2551 })
2552 })
2553 .collect::<Result<Vec<_>>>()?;
2554 Ok(protobuf::PhysicalPlanNode {
2555 physical_plan_type: Some(PhysicalPlanType::SortPreservingMerge(Box::new(
2556 protobuf::SortPreservingMergeExecNode {
2557 input: Some(Box::new(input)),
2558 expr,
2559 fetch: exec.fetch().map(|f| f as i64).unwrap_or(-1),
2560 },
2561 ))),
2562 })
2563 }
2564
2565 fn try_from_nested_loop_join_exec(
2566 exec: &NestedLoopJoinExec,
2567 extension_codec: &dyn PhysicalExtensionCodec,
2568 ) -> Result<Self> {
2569 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2570 exec.left().to_owned(),
2571 extension_codec,
2572 )?;
2573 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2574 exec.right().to_owned(),
2575 extension_codec,
2576 )?;
2577
2578 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2579 let filter = exec
2580 .filter()
2581 .as_ref()
2582 .map(|f| {
2583 let expression =
2584 serialize_physical_expr(f.expression(), extension_codec)?;
2585 let column_indices = f
2586 .column_indices()
2587 .iter()
2588 .map(|i| {
2589 let side: protobuf::JoinSide = i.side.to_owned().into();
2590 protobuf::ColumnIndex {
2591 index: i.index as u32,
2592 side: side.into(),
2593 }
2594 })
2595 .collect();
2596 let schema = f.schema().as_ref().try_into()?;
2597 Ok(protobuf::JoinFilter {
2598 expression: Some(expression),
2599 column_indices,
2600 schema: Some(schema),
2601 })
2602 })
2603 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2604
2605 Ok(protobuf::PhysicalPlanNode {
2606 physical_plan_type: Some(PhysicalPlanType::NestedLoopJoin(Box::new(
2607 protobuf::NestedLoopJoinExecNode {
2608 left: Some(Box::new(left)),
2609 right: Some(Box::new(right)),
2610 join_type: join_type.into(),
2611 filter,
2612 projection: exec.projection().map_or_else(Vec::new, |v| {
2613 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2614 }),
2615 },
2616 ))),
2617 })
2618 }
2619
2620 fn try_from_window_agg_exec(
2621 exec: &WindowAggExec,
2622 extension_codec: &dyn PhysicalExtensionCodec,
2623 ) -> Result<Self> {
2624 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2625 exec.input().to_owned(),
2626 extension_codec,
2627 )?;
2628
2629 let window_expr = exec
2630 .window_expr()
2631 .iter()
2632 .map(|e| serialize_physical_window_expr(e, extension_codec))
2633 .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
2634
2635 let partition_keys = exec
2636 .partition_keys()
2637 .iter()
2638 .map(|e| serialize_physical_expr(e, extension_codec))
2639 .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
2640
2641 Ok(protobuf::PhysicalPlanNode {
2642 physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
2643 protobuf::WindowAggExecNode {
2644 input: Some(Box::new(input)),
2645 window_expr,
2646 partition_keys,
2647 input_order_mode: None,
2648 },
2649 ))),
2650 })
2651 }
2652
2653 fn try_from_bounded_window_agg_exec(
2654 exec: &BoundedWindowAggExec,
2655 extension_codec: &dyn PhysicalExtensionCodec,
2656 ) -> Result<Self> {
2657 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2658 exec.input().to_owned(),
2659 extension_codec,
2660 )?;
2661
2662 let window_expr = exec
2663 .window_expr()
2664 .iter()
2665 .map(|e| serialize_physical_window_expr(e, extension_codec))
2666 .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
2667
2668 let partition_keys = exec
2669 .partition_keys()
2670 .iter()
2671 .map(|e| serialize_physical_expr(e, extension_codec))
2672 .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
2673
2674 let input_order_mode = match &exec.input_order_mode {
2675 InputOrderMode::Linear => {
2676 window_agg_exec_node::InputOrderMode::Linear(protobuf::EmptyMessage {})
2677 }
2678 InputOrderMode::PartiallySorted(columns) => {
2679 window_agg_exec_node::InputOrderMode::PartiallySorted(
2680 protobuf::PartiallySortedInputOrderMode {
2681 columns: columns.iter().map(|c| *c as u64).collect(),
2682 },
2683 )
2684 }
2685 InputOrderMode::Sorted => {
2686 window_agg_exec_node::InputOrderMode::Sorted(protobuf::EmptyMessage {})
2687 }
2688 };
2689
2690 Ok(protobuf::PhysicalPlanNode {
2691 physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
2692 protobuf::WindowAggExecNode {
2693 input: Some(Box::new(input)),
2694 window_expr,
2695 partition_keys,
2696 input_order_mode: Some(input_order_mode),
2697 },
2698 ))),
2699 })
2700 }
2701
2702 fn try_from_data_sink_exec(
2703 exec: &DataSinkExec,
2704 extension_codec: &dyn PhysicalExtensionCodec,
2705 ) -> Result<Option<Self>> {
2706 let input: protobuf::PhysicalPlanNode =
2707 protobuf::PhysicalPlanNode::try_from_physical_plan(
2708 exec.input().to_owned(),
2709 extension_codec,
2710 )?;
2711 let sort_order = match exec.sort_order() {
2712 Some(requirements) => {
2713 let expr = requirements
2714 .iter()
2715 .map(|requirement| {
2716 let expr: PhysicalSortExpr = requirement.to_owned().into();
2717 let sort_expr = protobuf::PhysicalSortExprNode {
2718 expr: Some(Box::new(serialize_physical_expr(
2719 &expr.expr,
2720 extension_codec,
2721 )?)),
2722 asc: !expr.options.descending,
2723 nulls_first: expr.options.nulls_first,
2724 };
2725 Ok(sort_expr)
2726 })
2727 .collect::<Result<Vec<_>>>()?;
2728 Some(protobuf::PhysicalSortExprNodeCollection {
2729 physical_sort_expr_nodes: expr,
2730 })
2731 }
2732 None => None,
2733 };
2734
2735 if let Some(sink) = exec.sink().as_any().downcast_ref::<JsonSink>() {
2736 return Ok(Some(protobuf::PhysicalPlanNode {
2737 physical_plan_type: Some(PhysicalPlanType::JsonSink(Box::new(
2738 protobuf::JsonSinkExecNode {
2739 input: Some(Box::new(input)),
2740 sink: Some(sink.try_into()?),
2741 sink_schema: Some(exec.schema().as_ref().try_into()?),
2742 sort_order,
2743 },
2744 ))),
2745 }));
2746 }
2747
2748 if let Some(sink) = exec.sink().as_any().downcast_ref::<CsvSink>() {
2749 return Ok(Some(protobuf::PhysicalPlanNode {
2750 physical_plan_type: Some(PhysicalPlanType::CsvSink(Box::new(
2751 protobuf::CsvSinkExecNode {
2752 input: Some(Box::new(input)),
2753 sink: Some(sink.try_into()?),
2754 sink_schema: Some(exec.schema().as_ref().try_into()?),
2755 sort_order,
2756 },
2757 ))),
2758 }));
2759 }
2760
2761 #[cfg(feature = "parquet")]
2762 if let Some(sink) = exec.sink().as_any().downcast_ref::<ParquetSink>() {
2763 return Ok(Some(protobuf::PhysicalPlanNode {
2764 physical_plan_type: Some(PhysicalPlanType::ParquetSink(Box::new(
2765 protobuf::ParquetSinkExecNode {
2766 input: Some(Box::new(input)),
2767 sink: Some(sink.try_into()?),
2768 sink_schema: Some(exec.schema().as_ref().try_into()?),
2769 sort_order,
2770 },
2771 ))),
2772 }));
2773 }
2774
2775 Ok(None)
2777 }
2778
2779 fn try_from_unnest_exec(
2780 exec: &UnnestExec,
2781 extension_codec: &dyn PhysicalExtensionCodec,
2782 ) -> Result<Self> {
2783 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2784 exec.input().to_owned(),
2785 extension_codec,
2786 )?;
2787
2788 Ok(protobuf::PhysicalPlanNode {
2789 physical_plan_type: Some(PhysicalPlanType::Unnest(Box::new(
2790 protobuf::UnnestExecNode {
2791 input: Some(Box::new(input)),
2792 schema: Some(exec.schema().try_into()?),
2793 list_type_columns: exec
2794 .list_column_indices()
2795 .iter()
2796 .map(|c| ProtoListUnnest {
2797 index_in_input_schema: c.index_in_input_schema as _,
2798 depth: c.depth as _,
2799 })
2800 .collect(),
2801 struct_type_columns: exec
2802 .struct_column_indices()
2803 .iter()
2804 .map(|c| *c as _)
2805 .collect(),
2806 options: Some(exec.options().into()),
2807 },
2808 ))),
2809 })
2810 }
2811
2812 fn try_from_cooperative_exec(
2813 exec: &CooperativeExec,
2814 extension_codec: &dyn PhysicalExtensionCodec,
2815 ) -> Result<Self> {
2816 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2817 exec.input().to_owned(),
2818 extension_codec,
2819 )?;
2820
2821 Ok(protobuf::PhysicalPlanNode {
2822 physical_plan_type: Some(PhysicalPlanType::Cooperative(Box::new(
2823 protobuf::CooperativeExecNode {
2824 input: Some(Box::new(input)),
2825 },
2826 ))),
2827 })
2828 }
2829}
2830
2831pub trait AsExecutionPlan: Debug + Send + Sync + Clone {
2832 fn try_decode(buf: &[u8]) -> Result<Self>
2833 where
2834 Self: Sized;
2835
2836 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
2837 where
2838 B: BufMut,
2839 Self: Sized;
2840
2841 fn try_into_physical_plan(
2842 &self,
2843 registry: &dyn FunctionRegistry,
2844 runtime: &RuntimeEnv,
2845 extension_codec: &dyn PhysicalExtensionCodec,
2846 ) -> Result<Arc<dyn ExecutionPlan>>;
2847
2848 fn try_from_physical_plan(
2849 plan: Arc<dyn ExecutionPlan>,
2850 extension_codec: &dyn PhysicalExtensionCodec,
2851 ) -> Result<Self>
2852 where
2853 Self: Sized;
2854}
2855
2856pub trait PhysicalExtensionCodec: Debug + Send + Sync {
2857 fn try_decode(
2858 &self,
2859 buf: &[u8],
2860 inputs: &[Arc<dyn ExecutionPlan>],
2861 registry: &dyn FunctionRegistry,
2862 ) -> Result<Arc<dyn ExecutionPlan>>;
2863
2864 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()>;
2865
2866 fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
2867 not_impl_err!("PhysicalExtensionCodec is not provided for scalar function {name}")
2868 }
2869
2870 fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
2871 Ok(())
2872 }
2873
2874 fn try_decode_expr(
2875 &self,
2876 _buf: &[u8],
2877 _inputs: &[Arc<dyn PhysicalExpr>],
2878 ) -> Result<Arc<dyn PhysicalExpr>> {
2879 not_impl_err!("PhysicalExtensionCodec is not provided")
2880 }
2881
2882 fn try_encode_expr(
2883 &self,
2884 _node: &Arc<dyn PhysicalExpr>,
2885 _buf: &mut Vec<u8>,
2886 ) -> Result<()> {
2887 not_impl_err!("PhysicalExtensionCodec is not provided")
2888 }
2889
2890 fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
2891 not_impl_err!(
2892 "PhysicalExtensionCodec is not provided for aggregate function {name}"
2893 )
2894 }
2895
2896 fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
2897 Ok(())
2898 }
2899
2900 fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
2901 not_impl_err!("PhysicalExtensionCodec is not provided for window function {name}")
2902 }
2903
2904 fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
2905 Ok(())
2906 }
2907}
2908
2909#[derive(Debug)]
2910pub struct DefaultPhysicalExtensionCodec {}
2911
2912impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec {
2913 fn try_decode(
2914 &self,
2915 _buf: &[u8],
2916 _inputs: &[Arc<dyn ExecutionPlan>],
2917 _registry: &dyn FunctionRegistry,
2918 ) -> Result<Arc<dyn ExecutionPlan>> {
2919 not_impl_err!("PhysicalExtensionCodec is not provided")
2920 }
2921
2922 fn try_encode(
2923 &self,
2924 _node: Arc<dyn ExecutionPlan>,
2925 _buf: &mut Vec<u8>,
2926 ) -> Result<()> {
2927 not_impl_err!("PhysicalExtensionCodec is not provided")
2928 }
2929}
2930
2931fn into_physical_plan(
2932 node: &Option<Box<protobuf::PhysicalPlanNode>>,
2933 registry: &dyn FunctionRegistry,
2934 runtime: &RuntimeEnv,
2935 extension_codec: &dyn PhysicalExtensionCodec,
2936) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
2937 if let Some(field) = node {
2938 field.try_into_physical_plan(registry, runtime, extension_codec)
2939 } else {
2940 Err(proto_error("Missing required field in protobuf"))
2941 }
2942}