1use std::fmt::Debug;
19use std::sync::Arc;
20
21use self::from_proto::parse_protobuf_partitioning;
22use self::to_proto::{serialize_partitioning, serialize_physical_expr};
23use crate::common::{byte_to_string, str_to_byte};
24use crate::physical_plan::from_proto::{
25 parse_physical_expr, parse_physical_sort_expr, parse_physical_sort_exprs,
26 parse_physical_window_expr, parse_protobuf_file_scan_config,
27 parse_protobuf_file_scan_schema,
28};
29use crate::physical_plan::to_proto::{
30 serialize_file_scan_config, serialize_maybe_filter, serialize_physical_aggr_expr,
31 serialize_physical_window_expr,
32};
33use crate::protobuf::physical_aggregate_expr_node::AggregateFunction;
34use crate::protobuf::physical_expr_node::ExprType;
35use crate::protobuf::physical_plan_node::PhysicalPlanType;
36use crate::protobuf::{
37 self, proto_error, window_agg_exec_node, ListUnnest as ProtoListUnnest,
38};
39use crate::{convert_required, into_required};
40
41use datafusion::arrow::compute::SortOptions;
42use datafusion::arrow::datatypes::SchemaRef;
43use datafusion::datasource::file_format::csv::CsvSink;
44use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
45use datafusion::datasource::file_format::json::JsonSink;
46#[cfg(feature = "parquet")]
47use datafusion::datasource::file_format::parquet::ParquetSink;
48#[cfg(feature = "avro")]
49use datafusion::datasource::physical_plan::AvroSource;
50#[cfg(feature = "parquet")]
51use datafusion::datasource::physical_plan::ParquetSource;
52use datafusion::datasource::physical_plan::{
53 CsvSource, FileScanConfig, FileScanConfigBuilder, JsonSource,
54};
55use datafusion::datasource::sink::DataSinkExec;
56use datafusion::datasource::source::DataSourceExec;
57use datafusion::execution::runtime_env::RuntimeEnv;
58use datafusion::execution::FunctionRegistry;
59use datafusion::physical_expr::aggregate::AggregateExprBuilder;
60use datafusion::physical_expr::aggregate::AggregateFunctionExpr;
61use datafusion::physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef};
62use datafusion::physical_plan::aggregates::AggregateMode;
63use datafusion::physical_plan::aggregates::{AggregateExec, PhysicalGroupBy};
64use datafusion::physical_plan::analyze::AnalyzeExec;
65use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
66use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
67use datafusion::physical_plan::empty::EmptyExec;
68use datafusion::physical_plan::explain::ExplainExec;
69use datafusion::physical_plan::expressions::PhysicalSortExpr;
70use datafusion::physical_plan::filter::FilterExec;
71use datafusion::physical_plan::joins::utils::{ColumnIndex, JoinFilter};
72use datafusion::physical_plan::joins::{
73 CrossJoinExec, NestedLoopJoinExec, StreamJoinPartitionMode, SymmetricHashJoinExec,
74};
75use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
76use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
77use datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
78use datafusion::physical_plan::projection::ProjectionExec;
79use datafusion::physical_plan::repartition::RepartitionExec;
80use datafusion::physical_plan::sorts::sort::SortExec;
81use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
82use datafusion::physical_plan::union::{InterleaveExec, UnionExec};
83use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec};
84use datafusion::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
85use datafusion::physical_plan::{
86 ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr,
87};
88use datafusion_common::config::TableParquetOptions;
89use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
90use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
91
92use prost::bytes::BufMut;
93use prost::Message;
94
95pub mod from_proto;
96pub mod to_proto;
97
98impl AsExecutionPlan for protobuf::PhysicalPlanNode {
99 fn try_decode(buf: &[u8]) -> Result<Self>
100 where
101 Self: Sized,
102 {
103 protobuf::PhysicalPlanNode::decode(buf).map_err(|e| {
104 DataFusionError::Internal(format!("failed to decode physical plan: {e:?}"))
105 })
106 }
107
108 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
109 where
110 B: BufMut,
111 Self: Sized,
112 {
113 self.encode(buf).map_err(|e| {
114 DataFusionError::Internal(format!("failed to encode physical plan: {e:?}"))
115 })
116 }
117
118 fn try_into_physical_plan(
119 &self,
120 registry: &dyn FunctionRegistry,
121 runtime: &RuntimeEnv,
122 extension_codec: &dyn PhysicalExtensionCodec,
123 ) -> Result<Arc<dyn ExecutionPlan>> {
124 let plan = self.physical_plan_type.as_ref().ok_or_else(|| {
125 proto_error(format!(
126 "physical_plan::from_proto() Unsupported physical plan '{self:?}'"
127 ))
128 })?;
129 match plan {
130 PhysicalPlanType::Explain(explain) => self.try_into_explain_physical_plan(
131 explain,
132 registry,
133 runtime,
134 extension_codec,
135 ),
136 PhysicalPlanType::Projection(projection) => self
137 .try_into_projection_physical_plan(
138 projection,
139 registry,
140 runtime,
141 extension_codec,
142 ),
143 PhysicalPlanType::Filter(filter) => self.try_into_filter_physical_plan(
144 filter,
145 registry,
146 runtime,
147 extension_codec,
148 ),
149 PhysicalPlanType::CsvScan(scan) => self.try_into_csv_scan_physical_plan(
150 scan,
151 registry,
152 runtime,
153 extension_codec,
154 ),
155 PhysicalPlanType::JsonScan(scan) => self.try_into_json_scan_physical_plan(
156 scan,
157 registry,
158 runtime,
159 extension_codec,
160 ),
161 #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
162 PhysicalPlanType::ParquetScan(scan) => self
163 .try_into_parquet_scan_physical_plan(
164 scan,
165 registry,
166 runtime,
167 extension_codec,
168 ),
169 #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
170 PhysicalPlanType::AvroScan(scan) => self.try_into_avro_scan_physical_plan(
171 scan,
172 registry,
173 runtime,
174 extension_codec,
175 ),
176 PhysicalPlanType::CoalesceBatches(coalesce_batches) => self
177 .try_into_coalesce_batches_physical_plan(
178 coalesce_batches,
179 registry,
180 runtime,
181 extension_codec,
182 ),
183 PhysicalPlanType::Merge(merge) => self.try_into_merge_physical_plan(
184 merge,
185 registry,
186 runtime,
187 extension_codec,
188 ),
189 PhysicalPlanType::Repartition(repart) => self
190 .try_into_repartition_physical_plan(
191 repart,
192 registry,
193 runtime,
194 extension_codec,
195 ),
196 PhysicalPlanType::GlobalLimit(limit) => self
197 .try_into_global_limit_physical_plan(
198 limit,
199 registry,
200 runtime,
201 extension_codec,
202 ),
203 PhysicalPlanType::LocalLimit(limit) => self
204 .try_into_local_limit_physical_plan(
205 limit,
206 registry,
207 runtime,
208 extension_codec,
209 ),
210 PhysicalPlanType::Window(window_agg) => self.try_into_window_physical_plan(
211 window_agg,
212 registry,
213 runtime,
214 extension_codec,
215 ),
216 PhysicalPlanType::Aggregate(hash_agg) => self
217 .try_into_aggregate_physical_plan(
218 hash_agg,
219 registry,
220 runtime,
221 extension_codec,
222 ),
223 PhysicalPlanType::HashJoin(hashjoin) => self
224 .try_into_hash_join_physical_plan(
225 hashjoin,
226 registry,
227 runtime,
228 extension_codec,
229 ),
230 PhysicalPlanType::SymmetricHashJoin(sym_join) => self
231 .try_into_symmetric_hash_join_physical_plan(
232 sym_join,
233 registry,
234 runtime,
235 extension_codec,
236 ),
237 PhysicalPlanType::Union(union) => self.try_into_union_physical_plan(
238 union,
239 registry,
240 runtime,
241 extension_codec,
242 ),
243 PhysicalPlanType::Interleave(interleave) => self
244 .try_into_interleave_physical_plan(
245 interleave,
246 registry,
247 runtime,
248 extension_codec,
249 ),
250 PhysicalPlanType::CrossJoin(crossjoin) => self
251 .try_into_cross_join_physical_plan(
252 crossjoin,
253 registry,
254 runtime,
255 extension_codec,
256 ),
257 PhysicalPlanType::Empty(empty) => self.try_into_empty_physical_plan(
258 empty,
259 registry,
260 runtime,
261 extension_codec,
262 ),
263 PhysicalPlanType::PlaceholderRow(placeholder) => self
264 .try_into_placeholder_row_physical_plan(
265 placeholder,
266 registry,
267 runtime,
268 extension_codec,
269 ),
270 PhysicalPlanType::Sort(sort) => {
271 self.try_into_sort_physical_plan(sort, registry, runtime, extension_codec)
272 }
273 PhysicalPlanType::SortPreservingMerge(sort) => self
274 .try_into_sort_preserving_merge_physical_plan(
275 sort,
276 registry,
277 runtime,
278 extension_codec,
279 ),
280 PhysicalPlanType::Extension(extension) => self
281 .try_into_extension_physical_plan(
282 extension,
283 registry,
284 runtime,
285 extension_codec,
286 ),
287 PhysicalPlanType::NestedLoopJoin(join) => self
288 .try_into_nested_loop_join_physical_plan(
289 join,
290 registry,
291 runtime,
292 extension_codec,
293 ),
294 PhysicalPlanType::Analyze(analyze) => self.try_into_analyze_physical_plan(
295 analyze,
296 registry,
297 runtime,
298 extension_codec,
299 ),
300 PhysicalPlanType::JsonSink(sink) => self.try_into_json_sink_physical_plan(
301 sink,
302 registry,
303 runtime,
304 extension_codec,
305 ),
306 PhysicalPlanType::CsvSink(sink) => self.try_into_csv_sink_physical_plan(
307 sink,
308 registry,
309 runtime,
310 extension_codec,
311 ),
312
313 #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
314 PhysicalPlanType::ParquetSink(sink) => self
315 .try_into_parquet_sink_physical_plan(
316 sink,
317 registry,
318 runtime,
319 extension_codec,
320 ),
321 PhysicalPlanType::Unnest(unnest) => self.try_into_unnest_physical_plan(
322 unnest,
323 registry,
324 runtime,
325 extension_codec,
326 ),
327 }
328 }
329
330 fn try_from_physical_plan(
331 plan: Arc<dyn ExecutionPlan>,
332 extension_codec: &dyn PhysicalExtensionCodec,
333 ) -> Result<Self>
334 where
335 Self: Sized,
336 {
337 let plan_clone = Arc::clone(&plan);
338 let plan = plan.as_any();
339
340 if let Some(exec) = plan.downcast_ref::<ExplainExec>() {
341 return protobuf::PhysicalPlanNode::try_from_explain_exec(
342 exec,
343 extension_codec,
344 );
345 }
346
347 if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
348 return protobuf::PhysicalPlanNode::try_from_projection_exec(
349 exec,
350 extension_codec,
351 );
352 }
353
354 if let Some(exec) = plan.downcast_ref::<AnalyzeExec>() {
355 return protobuf::PhysicalPlanNode::try_from_analyze_exec(
356 exec,
357 extension_codec,
358 );
359 }
360
361 if let Some(exec) = plan.downcast_ref::<FilterExec>() {
362 return protobuf::PhysicalPlanNode::try_from_filter_exec(
363 exec,
364 extension_codec,
365 );
366 }
367
368 if let Some(limit) = plan.downcast_ref::<GlobalLimitExec>() {
369 return protobuf::PhysicalPlanNode::try_from_global_limit_exec(
370 limit,
371 extension_codec,
372 );
373 }
374
375 if let Some(limit) = plan.downcast_ref::<LocalLimitExec>() {
376 return protobuf::PhysicalPlanNode::try_from_local_limit_exec(
377 limit,
378 extension_codec,
379 );
380 }
381
382 if let Some(exec) = plan.downcast_ref::<HashJoinExec>() {
383 return protobuf::PhysicalPlanNode::try_from_hash_join_exec(
384 exec,
385 extension_codec,
386 );
387 }
388
389 if let Some(exec) = plan.downcast_ref::<SymmetricHashJoinExec>() {
390 return protobuf::PhysicalPlanNode::try_from_symmetric_hash_join_exec(
391 exec,
392 extension_codec,
393 );
394 }
395
396 if let Some(exec) = plan.downcast_ref::<CrossJoinExec>() {
397 return protobuf::PhysicalPlanNode::try_from_cross_join_exec(
398 exec,
399 extension_codec,
400 );
401 }
402
403 if let Some(exec) = plan.downcast_ref::<AggregateExec>() {
404 return protobuf::PhysicalPlanNode::try_from_aggregate_exec(
405 exec,
406 extension_codec,
407 );
408 }
409
410 if let Some(empty) = plan.downcast_ref::<EmptyExec>() {
411 return protobuf::PhysicalPlanNode::try_from_empty_exec(
412 empty,
413 extension_codec,
414 );
415 }
416
417 if let Some(empty) = plan.downcast_ref::<PlaceholderRowExec>() {
418 return protobuf::PhysicalPlanNode::try_from_placeholder_row_exec(
419 empty,
420 extension_codec,
421 );
422 }
423
424 if let Some(coalesce_batches) = plan.downcast_ref::<CoalesceBatchesExec>() {
425 return protobuf::PhysicalPlanNode::try_from_coalesce_batches_exec(
426 coalesce_batches,
427 extension_codec,
428 );
429 }
430
431 if let Some(data_source_exec) = plan.downcast_ref::<DataSourceExec>() {
432 if let Some(node) = protobuf::PhysicalPlanNode::try_from_data_source_exec(
433 data_source_exec,
434 extension_codec,
435 )? {
436 return Ok(node);
437 }
438 }
439
440 if let Some(exec) = plan.downcast_ref::<CoalescePartitionsExec>() {
441 return protobuf::PhysicalPlanNode::try_from_coalesce_partitions_exec(
442 exec,
443 extension_codec,
444 );
445 }
446
447 if let Some(exec) = plan.downcast_ref::<RepartitionExec>() {
448 return protobuf::PhysicalPlanNode::try_from_repartition_exec(
449 exec,
450 extension_codec,
451 );
452 }
453
454 if let Some(exec) = plan.downcast_ref::<SortExec>() {
455 return protobuf::PhysicalPlanNode::try_from_sort_exec(exec, extension_codec);
456 }
457
458 if let Some(union) = plan.downcast_ref::<UnionExec>() {
459 return protobuf::PhysicalPlanNode::try_from_union_exec(
460 union,
461 extension_codec,
462 );
463 }
464
465 if let Some(interleave) = plan.downcast_ref::<InterleaveExec>() {
466 return protobuf::PhysicalPlanNode::try_from_interleave_exec(
467 interleave,
468 extension_codec,
469 );
470 }
471
472 if let Some(exec) = plan.downcast_ref::<SortPreservingMergeExec>() {
473 return protobuf::PhysicalPlanNode::try_from_sort_preserving_merge_exec(
474 exec,
475 extension_codec,
476 );
477 }
478
479 if let Some(exec) = plan.downcast_ref::<NestedLoopJoinExec>() {
480 return protobuf::PhysicalPlanNode::try_from_nested_loop_join_exec(
481 exec,
482 extension_codec,
483 );
484 }
485
486 if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
487 return protobuf::PhysicalPlanNode::try_from_window_agg_exec(
488 exec,
489 extension_codec,
490 );
491 }
492
493 if let Some(exec) = plan.downcast_ref::<BoundedWindowAggExec>() {
494 return protobuf::PhysicalPlanNode::try_from_bounded_window_agg_exec(
495 exec,
496 extension_codec,
497 );
498 }
499
500 if let Some(exec) = plan.downcast_ref::<DataSinkExec>() {
501 if let Some(node) = protobuf::PhysicalPlanNode::try_from_data_sink_exec(
502 exec,
503 extension_codec,
504 )? {
505 return Ok(node);
506 }
507 }
508
509 if let Some(exec) = plan.downcast_ref::<UnnestExec>() {
510 return protobuf::PhysicalPlanNode::try_from_unnest_exec(
511 exec,
512 extension_codec,
513 );
514 }
515
516 let mut buf: Vec<u8> = vec![];
517 match extension_codec.try_encode(Arc::clone(&plan_clone), &mut buf) {
518 Ok(_) => {
519 let inputs: Vec<protobuf::PhysicalPlanNode> = plan_clone
520 .children()
521 .into_iter()
522 .cloned()
523 .map(|i| {
524 protobuf::PhysicalPlanNode::try_from_physical_plan(
525 i,
526 extension_codec,
527 )
528 })
529 .collect::<Result<_>>()?;
530
531 Ok(protobuf::PhysicalPlanNode {
532 physical_plan_type: Some(PhysicalPlanType::Extension(
533 protobuf::PhysicalExtensionNode { node: buf, inputs },
534 )),
535 })
536 }
537 Err(e) => internal_err!(
538 "Unsupported plan and extension codec failed with [{e}]. Plan: {plan_clone:?}"
539 ),
540 }
541 }
542}
543
544impl protobuf::PhysicalPlanNode {
545 fn try_into_explain_physical_plan(
546 &self,
547 explain: &protobuf::ExplainExecNode,
548 _registry: &dyn FunctionRegistry,
549 _runtime: &RuntimeEnv,
550 _extension_codec: &dyn PhysicalExtensionCodec,
551 ) -> Result<Arc<dyn ExecutionPlan>> {
552 Ok(Arc::new(ExplainExec::new(
553 Arc::new(explain.schema.as_ref().unwrap().try_into()?),
554 explain
555 .stringified_plans
556 .iter()
557 .map(|plan| plan.into())
558 .collect(),
559 explain.verbose,
560 )))
561 }
562
563 fn try_into_projection_physical_plan(
564 &self,
565 projection: &protobuf::ProjectionExecNode,
566 registry: &dyn FunctionRegistry,
567 runtime: &RuntimeEnv,
568 extension_codec: &dyn PhysicalExtensionCodec,
569 ) -> Result<Arc<dyn ExecutionPlan>> {
570 let input: Arc<dyn ExecutionPlan> =
571 into_physical_plan(&projection.input, registry, runtime, extension_codec)?;
572 let exprs = projection
573 .expr
574 .iter()
575 .zip(projection.expr_name.iter())
576 .map(|(expr, name)| {
577 Ok((
578 parse_physical_expr(
579 expr,
580 registry,
581 input.schema().as_ref(),
582 extension_codec,
583 )?,
584 name.to_string(),
585 ))
586 })
587 .collect::<Result<Vec<(Arc<dyn PhysicalExpr>, String)>>>()?;
588 Ok(Arc::new(ProjectionExec::try_new(exprs, input)?))
589 }
590
591 fn try_into_filter_physical_plan(
592 &self,
593 filter: &protobuf::FilterExecNode,
594 registry: &dyn FunctionRegistry,
595 runtime: &RuntimeEnv,
596 extension_codec: &dyn PhysicalExtensionCodec,
597 ) -> Result<Arc<dyn ExecutionPlan>> {
598 let input: Arc<dyn ExecutionPlan> =
599 into_physical_plan(&filter.input, registry, runtime, extension_codec)?;
600 let predicate = filter
601 .expr
602 .as_ref()
603 .map(|expr| {
604 parse_physical_expr(
605 expr,
606 registry,
607 input.schema().as_ref(),
608 extension_codec,
609 )
610 })
611 .transpose()?
612 .ok_or_else(|| {
613 DataFusionError::Internal(
614 "filter (FilterExecNode) in PhysicalPlanNode is missing.".to_owned(),
615 )
616 })?;
617 let filter_selectivity = filter.default_filter_selectivity.try_into();
618 let projection = if !filter.projection.is_empty() {
619 Some(
620 filter
621 .projection
622 .iter()
623 .map(|i| *i as usize)
624 .collect::<Vec<_>>(),
625 )
626 } else {
627 None
628 };
629 let filter =
630 FilterExec::try_new(predicate, input)?.with_projection(projection)?;
631 match filter_selectivity {
632 Ok(filter_selectivity) => Ok(Arc::new(
633 filter.with_default_selectivity(filter_selectivity)?,
634 )),
635 Err(_) => Err(DataFusionError::Internal(
636 "filter_selectivity in PhysicalPlanNode is invalid ".to_owned(),
637 )),
638 }
639 }
640
641 fn try_into_csv_scan_physical_plan(
642 &self,
643 scan: &protobuf::CsvScanExecNode,
644 registry: &dyn FunctionRegistry,
645 _runtime: &RuntimeEnv,
646 extension_codec: &dyn PhysicalExtensionCodec,
647 ) -> Result<Arc<dyn ExecutionPlan>> {
648 let escape =
649 if let Some(protobuf::csv_scan_exec_node::OptionalEscape::Escape(escape)) =
650 &scan.optional_escape
651 {
652 Some(str_to_byte(escape, "escape")?)
653 } else {
654 None
655 };
656
657 let comment = if let Some(
658 protobuf::csv_scan_exec_node::OptionalComment::Comment(comment),
659 ) = &scan.optional_comment
660 {
661 Some(str_to_byte(comment, "comment")?)
662 } else {
663 None
664 };
665
666 let source = Arc::new(
667 CsvSource::new(
668 scan.has_header,
669 str_to_byte(&scan.delimiter, "delimiter")?,
670 0,
671 )
672 .with_escape(escape)
673 .with_comment(comment),
674 );
675
676 let conf = FileScanConfigBuilder::from(parse_protobuf_file_scan_config(
677 scan.base_conf.as_ref().unwrap(),
678 registry,
679 extension_codec,
680 source,
681 )?)
682 .with_newlines_in_values(scan.newlines_in_values)
683 .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
684 .build();
685 Ok(DataSourceExec::from_data_source(conf))
686 }
687
688 fn try_into_json_scan_physical_plan(
689 &self,
690 scan: &protobuf::JsonScanExecNode,
691 registry: &dyn FunctionRegistry,
692 _runtime: &RuntimeEnv,
693 extension_codec: &dyn PhysicalExtensionCodec,
694 ) -> Result<Arc<dyn ExecutionPlan>> {
695 let scan_conf = parse_protobuf_file_scan_config(
696 scan.base_conf.as_ref().unwrap(),
697 registry,
698 extension_codec,
699 Arc::new(JsonSource::new()),
700 )?;
701 Ok(DataSourceExec::from_data_source(scan_conf))
702 }
703
704 #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
705 fn try_into_parquet_scan_physical_plan(
706 &self,
707 scan: &protobuf::ParquetScanExecNode,
708 registry: &dyn FunctionRegistry,
709 _runtime: &RuntimeEnv,
710 extension_codec: &dyn PhysicalExtensionCodec,
711 ) -> Result<Arc<dyn ExecutionPlan>> {
712 #[cfg(feature = "parquet")]
713 {
714 let schema =
715 parse_protobuf_file_scan_schema(scan.base_conf.as_ref().unwrap())?;
716 let predicate = scan
717 .predicate
718 .as_ref()
719 .map(|expr| {
720 parse_physical_expr(expr, registry, schema.as_ref(), extension_codec)
721 })
722 .transpose()?;
723 let mut options = TableParquetOptions::default();
724
725 if let Some(table_options) = scan.parquet_options.as_ref() {
726 options = table_options.try_into()?;
727 }
728 let mut source = ParquetSource::new(options);
729
730 if let Some(predicate) = predicate {
731 source = source.with_predicate(predicate);
732 }
733 let base_config = parse_protobuf_file_scan_config(
734 scan.base_conf.as_ref().unwrap(),
735 registry,
736 extension_codec,
737 Arc::new(source),
738 )?;
739 Ok(DataSourceExec::from_data_source(base_config))
740 }
741 #[cfg(not(feature = "parquet"))]
742 panic!("Unable to process a Parquet PhysicalPlan when `parquet` feature is not enabled")
743 }
744
745 #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
746 fn try_into_avro_scan_physical_plan(
747 &self,
748 scan: &protobuf::AvroScanExecNode,
749 registry: &dyn FunctionRegistry,
750 _runtime: &RuntimeEnv,
751 extension_codec: &dyn PhysicalExtensionCodec,
752 ) -> Result<Arc<dyn ExecutionPlan>> {
753 #[cfg(feature = "avro")]
754 {
755 let conf = parse_protobuf_file_scan_config(
756 scan.base_conf.as_ref().unwrap(),
757 registry,
758 extension_codec,
759 Arc::new(AvroSource::new()),
760 )?;
761 Ok(DataSourceExec::from_data_source(conf))
762 }
763 #[cfg(not(feature = "avro"))]
764 panic!("Unable to process a Avro PhysicalPlan when `avro` feature is not enabled")
765 }
766
767 fn try_into_coalesce_batches_physical_plan(
768 &self,
769 coalesce_batches: &protobuf::CoalesceBatchesExecNode,
770 registry: &dyn FunctionRegistry,
771 runtime: &RuntimeEnv,
772 extension_codec: &dyn PhysicalExtensionCodec,
773 ) -> Result<Arc<dyn ExecutionPlan>> {
774 let input: Arc<dyn ExecutionPlan> = into_physical_plan(
775 &coalesce_batches.input,
776 registry,
777 runtime,
778 extension_codec,
779 )?;
780 Ok(Arc::new(
781 CoalesceBatchesExec::new(input, coalesce_batches.target_batch_size as usize)
782 .with_fetch(coalesce_batches.fetch.map(|f| f as usize)),
783 ))
784 }
785
786 fn try_into_merge_physical_plan(
787 &self,
788 merge: &protobuf::CoalescePartitionsExecNode,
789 registry: &dyn FunctionRegistry,
790 runtime: &RuntimeEnv,
791 extension_codec: &dyn PhysicalExtensionCodec,
792 ) -> Result<Arc<dyn ExecutionPlan>> {
793 let input: Arc<dyn ExecutionPlan> =
794 into_physical_plan(&merge.input, registry, runtime, extension_codec)?;
795 Ok(Arc::new(
796 CoalescePartitionsExec::new(input)
797 .with_fetch(merge.fetch.map(|f| f as usize)),
798 ))
799 }
800
801 fn try_into_repartition_physical_plan(
802 &self,
803 repart: &protobuf::RepartitionExecNode,
804 registry: &dyn FunctionRegistry,
805 runtime: &RuntimeEnv,
806 extension_codec: &dyn PhysicalExtensionCodec,
807 ) -> Result<Arc<dyn ExecutionPlan>> {
808 let input: Arc<dyn ExecutionPlan> =
809 into_physical_plan(&repart.input, registry, runtime, extension_codec)?;
810 let partitioning = parse_protobuf_partitioning(
811 repart.partitioning.as_ref(),
812 registry,
813 input.schema().as_ref(),
814 extension_codec,
815 )?;
816 Ok(Arc::new(RepartitionExec::try_new(
817 input,
818 partitioning.unwrap(),
819 )?))
820 }
821
822 fn try_into_global_limit_physical_plan(
823 &self,
824 limit: &protobuf::GlobalLimitExecNode,
825 registry: &dyn FunctionRegistry,
826 runtime: &RuntimeEnv,
827 extension_codec: &dyn PhysicalExtensionCodec,
828 ) -> Result<Arc<dyn ExecutionPlan>> {
829 let input: Arc<dyn ExecutionPlan> =
830 into_physical_plan(&limit.input, registry, runtime, extension_codec)?;
831 let fetch = if limit.fetch >= 0 {
832 Some(limit.fetch as usize)
833 } else {
834 None
835 };
836 Ok(Arc::new(GlobalLimitExec::new(
837 input,
838 limit.skip as usize,
839 fetch,
840 )))
841 }
842
843 fn try_into_local_limit_physical_plan(
844 &self,
845 limit: &protobuf::LocalLimitExecNode,
846 registry: &dyn FunctionRegistry,
847 runtime: &RuntimeEnv,
848 extension_codec: &dyn PhysicalExtensionCodec,
849 ) -> Result<Arc<dyn ExecutionPlan>> {
850 let input: Arc<dyn ExecutionPlan> =
851 into_physical_plan(&limit.input, registry, runtime, extension_codec)?;
852 Ok(Arc::new(LocalLimitExec::new(input, limit.fetch as usize)))
853 }
854
855 fn try_into_window_physical_plan(
856 &self,
857 window_agg: &protobuf::WindowAggExecNode,
858 registry: &dyn FunctionRegistry,
859 runtime: &RuntimeEnv,
860 extension_codec: &dyn PhysicalExtensionCodec,
861 ) -> Result<Arc<dyn ExecutionPlan>> {
862 let input: Arc<dyn ExecutionPlan> =
863 into_physical_plan(&window_agg.input, registry, runtime, extension_codec)?;
864 let input_schema = input.schema();
865
866 let physical_window_expr: Vec<Arc<dyn WindowExpr>> = window_agg
867 .window_expr
868 .iter()
869 .map(|window_expr| {
870 parse_physical_window_expr(
871 window_expr,
872 registry,
873 input_schema.as_ref(),
874 extension_codec,
875 )
876 })
877 .collect::<Result<Vec<_>, _>>()?;
878
879 let partition_keys = window_agg
880 .partition_keys
881 .iter()
882 .map(|expr| {
883 parse_physical_expr(
884 expr,
885 registry,
886 input.schema().as_ref(),
887 extension_codec,
888 )
889 })
890 .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
891
892 if let Some(input_order_mode) = window_agg.input_order_mode.as_ref() {
893 let input_order_mode = match input_order_mode {
894 window_agg_exec_node::InputOrderMode::Linear(_) => InputOrderMode::Linear,
895 window_agg_exec_node::InputOrderMode::PartiallySorted(
896 protobuf::PartiallySortedInputOrderMode { columns },
897 ) => InputOrderMode::PartiallySorted(
898 columns.iter().map(|c| *c as usize).collect(),
899 ),
900 window_agg_exec_node::InputOrderMode::Sorted(_) => InputOrderMode::Sorted,
901 };
902
903 Ok(Arc::new(BoundedWindowAggExec::try_new(
904 physical_window_expr,
905 input,
906 input_order_mode,
907 !partition_keys.is_empty(),
908 )?))
909 } else {
910 Ok(Arc::new(WindowAggExec::try_new(
911 physical_window_expr,
912 input,
913 !partition_keys.is_empty(),
914 )?))
915 }
916 }
917
918 fn try_into_aggregate_physical_plan(
919 &self,
920 hash_agg: &protobuf::AggregateExecNode,
921 registry: &dyn FunctionRegistry,
922 runtime: &RuntimeEnv,
923 extension_codec: &dyn PhysicalExtensionCodec,
924 ) -> Result<Arc<dyn ExecutionPlan>> {
925 let input: Arc<dyn ExecutionPlan> =
926 into_physical_plan(&hash_agg.input, registry, runtime, extension_codec)?;
927 let mode = protobuf::AggregateMode::try_from(hash_agg.mode).map_err(|_| {
928 proto_error(format!(
929 "Received a AggregateNode message with unknown AggregateMode {}",
930 hash_agg.mode
931 ))
932 })?;
933 let agg_mode: AggregateMode = match mode {
934 protobuf::AggregateMode::Partial => AggregateMode::Partial,
935 protobuf::AggregateMode::Final => AggregateMode::Final,
936 protobuf::AggregateMode::FinalPartitioned => AggregateMode::FinalPartitioned,
937 protobuf::AggregateMode::Single => AggregateMode::Single,
938 protobuf::AggregateMode::SinglePartitioned => {
939 AggregateMode::SinglePartitioned
940 }
941 };
942
943 let num_expr = hash_agg.group_expr.len();
944
945 let group_expr = hash_agg
946 .group_expr
947 .iter()
948 .zip(hash_agg.group_expr_name.iter())
949 .map(|(expr, name)| {
950 parse_physical_expr(
951 expr,
952 registry,
953 input.schema().as_ref(),
954 extension_codec,
955 )
956 .map(|expr| (expr, name.to_string()))
957 })
958 .collect::<Result<Vec<_>, _>>()?;
959
960 let null_expr = hash_agg
961 .null_expr
962 .iter()
963 .zip(hash_agg.group_expr_name.iter())
964 .map(|(expr, name)| {
965 parse_physical_expr(
966 expr,
967 registry,
968 input.schema().as_ref(),
969 extension_codec,
970 )
971 .map(|expr| (expr, name.to_string()))
972 })
973 .collect::<Result<Vec<_>, _>>()?;
974
975 let groups: Vec<Vec<bool>> = if !hash_agg.groups.is_empty() {
976 hash_agg
977 .groups
978 .chunks(num_expr)
979 .map(|g| g.to_vec())
980 .collect::<Vec<Vec<bool>>>()
981 } else {
982 vec![]
983 };
984
985 let input_schema = hash_agg.input_schema.as_ref().ok_or_else(|| {
986 DataFusionError::Internal(
987 "input_schema in AggregateNode is missing.".to_owned(),
988 )
989 })?;
990 let physical_schema: SchemaRef = SchemaRef::new(input_schema.try_into()?);
991
992 let physical_filter_expr = hash_agg
993 .filter_expr
994 .iter()
995 .map(|expr| {
996 expr.expr
997 .as_ref()
998 .map(|e| {
999 parse_physical_expr(
1000 e,
1001 registry,
1002 &physical_schema,
1003 extension_codec,
1004 )
1005 })
1006 .transpose()
1007 })
1008 .collect::<Result<Vec<_>, _>>()?;
1009
1010 let physical_aggr_expr: Vec<Arc<AggregateFunctionExpr>> = hash_agg
1011 .aggr_expr
1012 .iter()
1013 .zip(hash_agg.aggr_expr_name.iter())
1014 .map(|(expr, name)| {
1015 let expr_type = expr.expr_type.as_ref().ok_or_else(|| {
1016 proto_error("Unexpected empty aggregate physical expression")
1017 })?;
1018
1019 match expr_type {
1020 ExprType::AggregateExpr(agg_node) => {
1021 let input_phy_expr: Vec<Arc<dyn PhysicalExpr>> = agg_node
1022 .expr
1023 .iter()
1024 .map(|e| {
1025 parse_physical_expr(
1026 e,
1027 registry,
1028 &physical_schema,
1029 extension_codec,
1030 )
1031 })
1032 .collect::<Result<Vec<_>>>()?;
1033 let ordering_req: LexOrdering = agg_node
1034 .ordering_req
1035 .iter()
1036 .map(|e| {
1037 parse_physical_sort_expr(
1038 e,
1039 registry,
1040 &physical_schema,
1041 extension_codec,
1042 )
1043 })
1044 .collect::<Result<LexOrdering>>()?;
1045 agg_node
1046 .aggregate_function
1047 .as_ref()
1048 .map(|func| match func {
1049 AggregateFunction::UserDefinedAggrFunction(udaf_name) => {
1050 let agg_udf = match &agg_node.fun_definition {
1051 Some(buf) => extension_codec
1052 .try_decode_udaf(udaf_name, buf)?,
1053 None => {
1054 registry.udaf(udaf_name).or_else(|_| {
1055 extension_codec
1056 .try_decode_udaf(udaf_name, &[])
1057 })?
1058 }
1059 };
1060
1061 AggregateExprBuilder::new(agg_udf, input_phy_expr)
1062 .schema(Arc::clone(&physical_schema))
1063 .alias(name)
1064 .with_ignore_nulls(agg_node.ignore_nulls)
1065 .with_distinct(agg_node.distinct)
1066 .order_by(ordering_req)
1067 .build()
1068 .map(Arc::new)
1069 }
1070 })
1071 .transpose()?
1072 .ok_or_else(|| {
1073 proto_error(
1074 "Invalid AggregateExpr, missing aggregate_function",
1075 )
1076 })
1077 }
1078 _ => internal_err!("Invalid aggregate expression for AggregateExec"),
1079 }
1080 })
1081 .collect::<Result<Vec<_>, _>>()?;
1082
1083 let limit = hash_agg
1084 .limit
1085 .as_ref()
1086 .map(|lit_value| lit_value.limit as usize);
1087
1088 let agg = AggregateExec::try_new(
1089 agg_mode,
1090 PhysicalGroupBy::new(group_expr, null_expr, groups),
1091 physical_aggr_expr,
1092 physical_filter_expr,
1093 input,
1094 physical_schema,
1095 )?;
1096
1097 let agg = agg.with_limit(limit);
1098
1099 Ok(Arc::new(agg))
1100 }
1101
1102 fn try_into_hash_join_physical_plan(
1103 &self,
1104 hashjoin: &protobuf::HashJoinExecNode,
1105 registry: &dyn FunctionRegistry,
1106 runtime: &RuntimeEnv,
1107 extension_codec: &dyn PhysicalExtensionCodec,
1108 ) -> Result<Arc<dyn ExecutionPlan>> {
1109 let left: Arc<dyn ExecutionPlan> =
1110 into_physical_plan(&hashjoin.left, registry, runtime, extension_codec)?;
1111 let right: Arc<dyn ExecutionPlan> =
1112 into_physical_plan(&hashjoin.right, registry, runtime, extension_codec)?;
1113 let left_schema = left.schema();
1114 let right_schema = right.schema();
1115 let on: Vec<(PhysicalExprRef, PhysicalExprRef)> = hashjoin
1116 .on
1117 .iter()
1118 .map(|col| {
1119 let left = parse_physical_expr(
1120 &col.left.clone().unwrap(),
1121 registry,
1122 left_schema.as_ref(),
1123 extension_codec,
1124 )?;
1125 let right = parse_physical_expr(
1126 &col.right.clone().unwrap(),
1127 registry,
1128 right_schema.as_ref(),
1129 extension_codec,
1130 )?;
1131 Ok((left, right))
1132 })
1133 .collect::<Result<_>>()?;
1134 let join_type =
1135 protobuf::JoinType::try_from(hashjoin.join_type).map_err(|_| {
1136 proto_error(format!(
1137 "Received a HashJoinNode message with unknown JoinType {}",
1138 hashjoin.join_type
1139 ))
1140 })?;
1141 let filter = hashjoin
1142 .filter
1143 .as_ref()
1144 .map(|f| {
1145 let schema = f
1146 .schema
1147 .as_ref()
1148 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1149 .try_into()?;
1150
1151 let expression = parse_physical_expr(
1152 f.expression.as_ref().ok_or_else(|| {
1153 proto_error("Unexpected empty filter expression")
1154 })?,
1155 registry, &schema,
1156 extension_codec,
1157 )?;
1158 let column_indices = f.column_indices
1159 .iter()
1160 .map(|i| {
1161 let side = protobuf::JoinSide::try_from(i.side)
1162 .map_err(|_| proto_error(format!(
1163 "Received a HashJoinNode message with JoinSide in Filter {}",
1164 i.side))
1165 )?;
1166
1167 Ok(ColumnIndex {
1168 index: i.index as usize,
1169 side: side.into(),
1170 })
1171 })
1172 .collect::<Result<Vec<_>>>()?;
1173
1174 Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1175 })
1176 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1177
1178 let partition_mode = protobuf::PartitionMode::try_from(hashjoin.partition_mode)
1179 .map_err(|_| {
1180 proto_error(format!(
1181 "Received a HashJoinNode message with unknown PartitionMode {}",
1182 hashjoin.partition_mode
1183 ))
1184 })?;
1185 let partition_mode = match partition_mode {
1186 protobuf::PartitionMode::CollectLeft => PartitionMode::CollectLeft,
1187 protobuf::PartitionMode::Partitioned => PartitionMode::Partitioned,
1188 protobuf::PartitionMode::Auto => PartitionMode::Auto,
1189 };
1190 let projection = if !hashjoin.projection.is_empty() {
1191 Some(
1192 hashjoin
1193 .projection
1194 .iter()
1195 .map(|i| *i as usize)
1196 .collect::<Vec<_>>(),
1197 )
1198 } else {
1199 None
1200 };
1201 Ok(Arc::new(HashJoinExec::try_new(
1202 left,
1203 right,
1204 on,
1205 filter,
1206 &join_type.into(),
1207 projection,
1208 partition_mode,
1209 hashjoin.null_equals_null,
1210 )?))
1211 }
1212
1213 fn try_into_symmetric_hash_join_physical_plan(
1214 &self,
1215 sym_join: &protobuf::SymmetricHashJoinExecNode,
1216 registry: &dyn FunctionRegistry,
1217 runtime: &RuntimeEnv,
1218 extension_codec: &dyn PhysicalExtensionCodec,
1219 ) -> Result<Arc<dyn ExecutionPlan>> {
1220 let left =
1221 into_physical_plan(&sym_join.left, registry, runtime, extension_codec)?;
1222 let right =
1223 into_physical_plan(&sym_join.right, registry, runtime, extension_codec)?;
1224 let left_schema = left.schema();
1225 let right_schema = right.schema();
1226 let on = sym_join
1227 .on
1228 .iter()
1229 .map(|col| {
1230 let left = parse_physical_expr(
1231 &col.left.clone().unwrap(),
1232 registry,
1233 left_schema.as_ref(),
1234 extension_codec,
1235 )?;
1236 let right = parse_physical_expr(
1237 &col.right.clone().unwrap(),
1238 registry,
1239 right_schema.as_ref(),
1240 extension_codec,
1241 )?;
1242 Ok((left, right))
1243 })
1244 .collect::<Result<_>>()?;
1245 let join_type =
1246 protobuf::JoinType::try_from(sym_join.join_type).map_err(|_| {
1247 proto_error(format!(
1248 "Received a SymmetricHashJoin message with unknown JoinType {}",
1249 sym_join.join_type
1250 ))
1251 })?;
1252 let filter = sym_join
1253 .filter
1254 .as_ref()
1255 .map(|f| {
1256 let schema = f
1257 .schema
1258 .as_ref()
1259 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1260 .try_into()?;
1261
1262 let expression = parse_physical_expr(
1263 f.expression.as_ref().ok_or_else(|| {
1264 proto_error("Unexpected empty filter expression")
1265 })?,
1266 registry, &schema,
1267 extension_codec,
1268 )?;
1269 let column_indices = f.column_indices
1270 .iter()
1271 .map(|i| {
1272 let side = protobuf::JoinSide::try_from(i.side)
1273 .map_err(|_| proto_error(format!(
1274 "Received a HashJoinNode message with JoinSide in Filter {}",
1275 i.side))
1276 )?;
1277
1278 Ok(ColumnIndex {
1279 index: i.index as usize,
1280 side: side.into(),
1281 })
1282 })
1283 .collect::<Result<_>>()?;
1284
1285 Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1286 })
1287 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1288
1289 let left_sort_exprs = parse_physical_sort_exprs(
1290 &sym_join.left_sort_exprs,
1291 registry,
1292 &left_schema,
1293 extension_codec,
1294 )?;
1295 let left_sort_exprs = if left_sort_exprs.is_empty() {
1296 None
1297 } else {
1298 Some(left_sort_exprs)
1299 };
1300
1301 let right_sort_exprs = parse_physical_sort_exprs(
1302 &sym_join.right_sort_exprs,
1303 registry,
1304 &right_schema,
1305 extension_codec,
1306 )?;
1307 let right_sort_exprs = if right_sort_exprs.is_empty() {
1308 None
1309 } else {
1310 Some(right_sort_exprs)
1311 };
1312
1313 let partition_mode = protobuf::StreamPartitionMode::try_from(
1314 sym_join.partition_mode,
1315 )
1316 .map_err(|_| {
1317 proto_error(format!(
1318 "Received a SymmetricHashJoin message with unknown PartitionMode {}",
1319 sym_join.partition_mode
1320 ))
1321 })?;
1322 let partition_mode = match partition_mode {
1323 protobuf::StreamPartitionMode::SinglePartition => {
1324 StreamJoinPartitionMode::SinglePartition
1325 }
1326 protobuf::StreamPartitionMode::PartitionedExec => {
1327 StreamJoinPartitionMode::Partitioned
1328 }
1329 };
1330 SymmetricHashJoinExec::try_new(
1331 left,
1332 right,
1333 on,
1334 filter,
1335 &join_type.into(),
1336 sym_join.null_equals_null,
1337 left_sort_exprs,
1338 right_sort_exprs,
1339 partition_mode,
1340 )
1341 .map(|e| Arc::new(e) as _)
1342 }
1343
1344 fn try_into_union_physical_plan(
1345 &self,
1346 union: &protobuf::UnionExecNode,
1347 registry: &dyn FunctionRegistry,
1348 runtime: &RuntimeEnv,
1349 extension_codec: &dyn PhysicalExtensionCodec,
1350 ) -> Result<Arc<dyn ExecutionPlan>> {
1351 let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1352 for input in &union.inputs {
1353 inputs.push(input.try_into_physical_plan(
1354 registry,
1355 runtime,
1356 extension_codec,
1357 )?);
1358 }
1359 Ok(Arc::new(UnionExec::new(inputs)))
1360 }
1361
1362 fn try_into_interleave_physical_plan(
1363 &self,
1364 interleave: &protobuf::InterleaveExecNode,
1365 registry: &dyn FunctionRegistry,
1366 runtime: &RuntimeEnv,
1367 extension_codec: &dyn PhysicalExtensionCodec,
1368 ) -> Result<Arc<dyn ExecutionPlan>> {
1369 let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1370 for input in &interleave.inputs {
1371 inputs.push(input.try_into_physical_plan(
1372 registry,
1373 runtime,
1374 extension_codec,
1375 )?);
1376 }
1377 Ok(Arc::new(InterleaveExec::try_new(inputs)?))
1378 }
1379
1380 fn try_into_cross_join_physical_plan(
1381 &self,
1382 crossjoin: &protobuf::CrossJoinExecNode,
1383 registry: &dyn FunctionRegistry,
1384 runtime: &RuntimeEnv,
1385 extension_codec: &dyn PhysicalExtensionCodec,
1386 ) -> Result<Arc<dyn ExecutionPlan>> {
1387 let left: Arc<dyn ExecutionPlan> =
1388 into_physical_plan(&crossjoin.left, registry, runtime, extension_codec)?;
1389 let right: Arc<dyn ExecutionPlan> =
1390 into_physical_plan(&crossjoin.right, registry, runtime, extension_codec)?;
1391 Ok(Arc::new(CrossJoinExec::new(left, right)))
1392 }
1393
1394 fn try_into_empty_physical_plan(
1395 &self,
1396 empty: &protobuf::EmptyExecNode,
1397 _registry: &dyn FunctionRegistry,
1398 _runtime: &RuntimeEnv,
1399 _extension_codec: &dyn PhysicalExtensionCodec,
1400 ) -> Result<Arc<dyn ExecutionPlan>> {
1401 let schema = Arc::new(convert_required!(empty.schema)?);
1402 Ok(Arc::new(EmptyExec::new(schema)))
1403 }
1404
1405 fn try_into_placeholder_row_physical_plan(
1406 &self,
1407 placeholder: &protobuf::PlaceholderRowExecNode,
1408 _registry: &dyn FunctionRegistry,
1409 _runtime: &RuntimeEnv,
1410 _extension_codec: &dyn PhysicalExtensionCodec,
1411 ) -> Result<Arc<dyn ExecutionPlan>> {
1412 let schema = Arc::new(convert_required!(placeholder.schema)?);
1413 Ok(Arc::new(PlaceholderRowExec::new(schema)))
1414 }
1415
1416 fn try_into_sort_physical_plan(
1417 &self,
1418 sort: &protobuf::SortExecNode,
1419 registry: &dyn FunctionRegistry,
1420 runtime: &RuntimeEnv,
1421 extension_codec: &dyn PhysicalExtensionCodec,
1422 ) -> Result<Arc<dyn ExecutionPlan>> {
1423 let input: Arc<dyn ExecutionPlan> =
1424 into_physical_plan(&sort.input, registry, runtime, extension_codec)?;
1425 let exprs = sort
1426 .expr
1427 .iter()
1428 .map(|expr| {
1429 let expr = expr.expr_type.as_ref().ok_or_else(|| {
1430 proto_error(format!(
1431 "physical_plan::from_proto() Unexpected expr {self:?}"
1432 ))
1433 })?;
1434 if let ExprType::Sort(sort_expr) = expr {
1435 let expr = sort_expr
1436 .expr
1437 .as_ref()
1438 .ok_or_else(|| {
1439 proto_error(format!(
1440 "physical_plan::from_proto() Unexpected sort expr {self:?}"
1441 ))
1442 })?
1443 .as_ref();
1444 Ok(PhysicalSortExpr {
1445 expr: parse_physical_expr(expr, registry, input.schema().as_ref(), extension_codec)?,
1446 options: SortOptions {
1447 descending: !sort_expr.asc,
1448 nulls_first: sort_expr.nulls_first,
1449 },
1450 })
1451 } else {
1452 internal_err!(
1453 "physical_plan::from_proto() {self:?}"
1454 )
1455 }
1456 })
1457 .collect::<Result<LexOrdering, _>>()?;
1458 let fetch = if sort.fetch < 0 {
1459 None
1460 } else {
1461 Some(sort.fetch as usize)
1462 };
1463 let new_sort = SortExec::new(exprs, input)
1464 .with_fetch(fetch)
1465 .with_preserve_partitioning(sort.preserve_partitioning);
1466
1467 Ok(Arc::new(new_sort))
1468 }
1469
1470 fn try_into_sort_preserving_merge_physical_plan(
1471 &self,
1472 sort: &protobuf::SortPreservingMergeExecNode,
1473 registry: &dyn FunctionRegistry,
1474 runtime: &RuntimeEnv,
1475 extension_codec: &dyn PhysicalExtensionCodec,
1476 ) -> Result<Arc<dyn ExecutionPlan>> {
1477 let input: Arc<dyn ExecutionPlan> =
1478 into_physical_plan(&sort.input, registry, runtime, extension_codec)?;
1479 let exprs = sort
1480 .expr
1481 .iter()
1482 .map(|expr| {
1483 let expr = expr.expr_type.as_ref().ok_or_else(|| {
1484 proto_error(format!(
1485 "physical_plan::from_proto() Unexpected expr {self:?}"
1486 ))
1487 })?;
1488 if let ExprType::Sort(sort_expr) = expr {
1489 let expr = sort_expr
1490 .expr
1491 .as_ref()
1492 .ok_or_else(|| {
1493 proto_error(format!(
1494 "physical_plan::from_proto() Unexpected sort expr {self:?}"
1495 ))
1496 })?
1497 .as_ref();
1498 Ok(PhysicalSortExpr {
1499 expr: parse_physical_expr(
1500 expr,
1501 registry,
1502 input.schema().as_ref(),
1503 extension_codec,
1504 )?,
1505 options: SortOptions {
1506 descending: !sort_expr.asc,
1507 nulls_first: sort_expr.nulls_first,
1508 },
1509 })
1510 } else {
1511 internal_err!("physical_plan::from_proto() {self:?}")
1512 }
1513 })
1514 .collect::<Result<LexOrdering, _>>()?;
1515 let fetch = if sort.fetch < 0 {
1516 None
1517 } else {
1518 Some(sort.fetch as usize)
1519 };
1520 Ok(Arc::new(
1521 SortPreservingMergeExec::new(exprs, input).with_fetch(fetch),
1522 ))
1523 }
1524
1525 fn try_into_extension_physical_plan(
1526 &self,
1527 extension: &protobuf::PhysicalExtensionNode,
1528 registry: &dyn FunctionRegistry,
1529 runtime: &RuntimeEnv,
1530 extension_codec: &dyn PhysicalExtensionCodec,
1531 ) -> Result<Arc<dyn ExecutionPlan>> {
1532 let inputs: Vec<Arc<dyn ExecutionPlan>> = extension
1533 .inputs
1534 .iter()
1535 .map(|i| i.try_into_physical_plan(registry, runtime, extension_codec))
1536 .collect::<Result<_>>()?;
1537
1538 let extension_node =
1539 extension_codec.try_decode(extension.node.as_slice(), &inputs, registry)?;
1540
1541 Ok(extension_node)
1542 }
1543
1544 fn try_into_nested_loop_join_physical_plan(
1545 &self,
1546 join: &protobuf::NestedLoopJoinExecNode,
1547 registry: &dyn FunctionRegistry,
1548 runtime: &RuntimeEnv,
1549 extension_codec: &dyn PhysicalExtensionCodec,
1550 ) -> Result<Arc<dyn ExecutionPlan>> {
1551 let left: Arc<dyn ExecutionPlan> =
1552 into_physical_plan(&join.left, registry, runtime, extension_codec)?;
1553 let right: Arc<dyn ExecutionPlan> =
1554 into_physical_plan(&join.right, registry, runtime, extension_codec)?;
1555 let join_type = protobuf::JoinType::try_from(join.join_type).map_err(|_| {
1556 proto_error(format!(
1557 "Received a NestedLoopJoinExecNode message with unknown JoinType {}",
1558 join.join_type
1559 ))
1560 })?;
1561 let filter = join
1562 .filter
1563 .as_ref()
1564 .map(|f| {
1565 let schema = f
1566 .schema
1567 .as_ref()
1568 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1569 .try_into()?;
1570
1571 let expression = parse_physical_expr(
1572 f.expression.as_ref().ok_or_else(|| {
1573 proto_error("Unexpected empty filter expression")
1574 })?,
1575 registry, &schema,
1576 extension_codec,
1577 )?;
1578 let column_indices = f.column_indices
1579 .iter()
1580 .map(|i| {
1581 let side = protobuf::JoinSide::try_from(i.side)
1582 .map_err(|_| proto_error(format!(
1583 "Received a NestedLoopJoinExecNode message with JoinSide in Filter {}",
1584 i.side))
1585 )?;
1586
1587 Ok(ColumnIndex {
1588 index: i.index as usize,
1589 side: side.into(),
1590 })
1591 })
1592 .collect::<Result<Vec<_>>>()?;
1593
1594 Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1595 })
1596 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1597
1598 let projection = if !join.projection.is_empty() {
1599 Some(
1600 join.projection
1601 .iter()
1602 .map(|i| *i as usize)
1603 .collect::<Vec<_>>(),
1604 )
1605 } else {
1606 None
1607 };
1608
1609 Ok(Arc::new(NestedLoopJoinExec::try_new(
1610 left,
1611 right,
1612 filter,
1613 &join_type.into(),
1614 projection,
1615 )?))
1616 }
1617
1618 fn try_into_analyze_physical_plan(
1619 &self,
1620 analyze: &protobuf::AnalyzeExecNode,
1621 registry: &dyn FunctionRegistry,
1622 runtime: &RuntimeEnv,
1623 extension_codec: &dyn PhysicalExtensionCodec,
1624 ) -> Result<Arc<dyn ExecutionPlan>> {
1625 let input: Arc<dyn ExecutionPlan> =
1626 into_physical_plan(&analyze.input, registry, runtime, extension_codec)?;
1627 Ok(Arc::new(AnalyzeExec::new(
1628 analyze.verbose,
1629 analyze.show_statistics,
1630 input,
1631 Arc::new(convert_required!(analyze.schema)?),
1632 )))
1633 }
1634
1635 fn try_into_json_sink_physical_plan(
1636 &self,
1637 sink: &protobuf::JsonSinkExecNode,
1638 registry: &dyn FunctionRegistry,
1639 runtime: &RuntimeEnv,
1640 extension_codec: &dyn PhysicalExtensionCodec,
1641 ) -> Result<Arc<dyn ExecutionPlan>> {
1642 let input = into_physical_plan(&sink.input, registry, runtime, extension_codec)?;
1643
1644 let data_sink: JsonSink = sink
1645 .sink
1646 .as_ref()
1647 .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1648 .try_into()?;
1649 let sink_schema = input.schema();
1650 let sort_order = sink
1651 .sort_order
1652 .as_ref()
1653 .map(|collection| {
1654 parse_physical_sort_exprs(
1655 &collection.physical_sort_expr_nodes,
1656 registry,
1657 &sink_schema,
1658 extension_codec,
1659 )
1660 .map(LexRequirement::from)
1661 })
1662 .transpose()?;
1663 Ok(Arc::new(DataSinkExec::new(
1664 input,
1665 Arc::new(data_sink),
1666 sort_order,
1667 )))
1668 }
1669
1670 fn try_into_csv_sink_physical_plan(
1671 &self,
1672 sink: &protobuf::CsvSinkExecNode,
1673 registry: &dyn FunctionRegistry,
1674 runtime: &RuntimeEnv,
1675 extension_codec: &dyn PhysicalExtensionCodec,
1676 ) -> Result<Arc<dyn ExecutionPlan>> {
1677 let input = into_physical_plan(&sink.input, registry, runtime, extension_codec)?;
1678
1679 let data_sink: CsvSink = sink
1680 .sink
1681 .as_ref()
1682 .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1683 .try_into()?;
1684 let sink_schema = input.schema();
1685 let sort_order = sink
1686 .sort_order
1687 .as_ref()
1688 .map(|collection| {
1689 parse_physical_sort_exprs(
1690 &collection.physical_sort_expr_nodes,
1691 registry,
1692 &sink_schema,
1693 extension_codec,
1694 )
1695 .map(LexRequirement::from)
1696 })
1697 .transpose()?;
1698 Ok(Arc::new(DataSinkExec::new(
1699 input,
1700 Arc::new(data_sink),
1701 sort_order,
1702 )))
1703 }
1704
1705 fn try_into_parquet_sink_physical_plan(
1706 &self,
1707 sink: &protobuf::ParquetSinkExecNode,
1708 registry: &dyn FunctionRegistry,
1709 runtime: &RuntimeEnv,
1710 extension_codec: &dyn PhysicalExtensionCodec,
1711 ) -> Result<Arc<dyn ExecutionPlan>> {
1712 #[cfg(feature = "parquet")]
1713 {
1714 let input =
1715 into_physical_plan(&sink.input, registry, runtime, extension_codec)?;
1716
1717 let data_sink: ParquetSink = sink
1718 .sink
1719 .as_ref()
1720 .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1721 .try_into()?;
1722 let sink_schema = input.schema();
1723 let sort_order = sink
1724 .sort_order
1725 .as_ref()
1726 .map(|collection| {
1727 parse_physical_sort_exprs(
1728 &collection.physical_sort_expr_nodes,
1729 registry,
1730 &sink_schema,
1731 extension_codec,
1732 )
1733 .map(LexRequirement::from)
1734 })
1735 .transpose()?;
1736 Ok(Arc::new(DataSinkExec::new(
1737 input,
1738 Arc::new(data_sink),
1739 sort_order,
1740 )))
1741 }
1742 #[cfg(not(feature = "parquet"))]
1743 panic!("Trying to use ParquetSink without `parquet` feature enabled");
1744 }
1745
1746 fn try_into_unnest_physical_plan(
1747 &self,
1748 unnest: &protobuf::UnnestExecNode,
1749 registry: &dyn FunctionRegistry,
1750 runtime: &RuntimeEnv,
1751 extension_codec: &dyn PhysicalExtensionCodec,
1752 ) -> Result<Arc<dyn ExecutionPlan>> {
1753 let input =
1754 into_physical_plan(&unnest.input, registry, runtime, extension_codec)?;
1755
1756 Ok(Arc::new(UnnestExec::new(
1757 input,
1758 unnest
1759 .list_type_columns
1760 .iter()
1761 .map(|c| ListUnnest {
1762 index_in_input_schema: c.index_in_input_schema as _,
1763 depth: c.depth as _,
1764 })
1765 .collect(),
1766 unnest.struct_type_columns.iter().map(|c| *c as _).collect(),
1767 Arc::new(convert_required!(unnest.schema)?),
1768 into_required!(unnest.options)?,
1769 )))
1770 }
1771
1772 fn try_from_explain_exec(
1773 exec: &ExplainExec,
1774 _extension_codec: &dyn PhysicalExtensionCodec,
1775 ) -> Result<Self> {
1776 Ok(protobuf::PhysicalPlanNode {
1777 physical_plan_type: Some(PhysicalPlanType::Explain(
1778 protobuf::ExplainExecNode {
1779 schema: Some(exec.schema().as_ref().try_into()?),
1780 stringified_plans: exec
1781 .stringified_plans()
1782 .iter()
1783 .map(|plan| plan.into())
1784 .collect(),
1785 verbose: exec.verbose(),
1786 },
1787 )),
1788 })
1789 }
1790
1791 fn try_from_projection_exec(
1792 exec: &ProjectionExec,
1793 extension_codec: &dyn PhysicalExtensionCodec,
1794 ) -> Result<Self> {
1795 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1796 exec.input().to_owned(),
1797 extension_codec,
1798 )?;
1799 let expr = exec
1800 .expr()
1801 .iter()
1802 .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
1803 .collect::<Result<Vec<_>>>()?;
1804 let expr_name = exec.expr().iter().map(|expr| expr.1.clone()).collect();
1805 Ok(protobuf::PhysicalPlanNode {
1806 physical_plan_type: Some(PhysicalPlanType::Projection(Box::new(
1807 protobuf::ProjectionExecNode {
1808 input: Some(Box::new(input)),
1809 expr,
1810 expr_name,
1811 },
1812 ))),
1813 })
1814 }
1815
1816 fn try_from_analyze_exec(
1817 exec: &AnalyzeExec,
1818 extension_codec: &dyn PhysicalExtensionCodec,
1819 ) -> Result<Self> {
1820 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1821 exec.input().to_owned(),
1822 extension_codec,
1823 )?;
1824 Ok(protobuf::PhysicalPlanNode {
1825 physical_plan_type: Some(PhysicalPlanType::Analyze(Box::new(
1826 protobuf::AnalyzeExecNode {
1827 verbose: exec.verbose(),
1828 show_statistics: exec.show_statistics(),
1829 input: Some(Box::new(input)),
1830 schema: Some(exec.schema().as_ref().try_into()?),
1831 },
1832 ))),
1833 })
1834 }
1835
1836 fn try_from_filter_exec(
1837 exec: &FilterExec,
1838 extension_codec: &dyn PhysicalExtensionCodec,
1839 ) -> Result<Self> {
1840 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1841 exec.input().to_owned(),
1842 extension_codec,
1843 )?;
1844 Ok(protobuf::PhysicalPlanNode {
1845 physical_plan_type: Some(PhysicalPlanType::Filter(Box::new(
1846 protobuf::FilterExecNode {
1847 input: Some(Box::new(input)),
1848 expr: Some(serialize_physical_expr(
1849 exec.predicate(),
1850 extension_codec,
1851 )?),
1852 default_filter_selectivity: exec.default_selectivity() as u32,
1853 projection: exec.projection().as_ref().map_or_else(Vec::new, |v| {
1854 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
1855 }),
1856 },
1857 ))),
1858 })
1859 }
1860
1861 fn try_from_global_limit_exec(
1862 limit: &GlobalLimitExec,
1863 extension_codec: &dyn PhysicalExtensionCodec,
1864 ) -> Result<Self> {
1865 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1866 limit.input().to_owned(),
1867 extension_codec,
1868 )?;
1869
1870 Ok(protobuf::PhysicalPlanNode {
1871 physical_plan_type: Some(PhysicalPlanType::GlobalLimit(Box::new(
1872 protobuf::GlobalLimitExecNode {
1873 input: Some(Box::new(input)),
1874 skip: limit.skip() as u32,
1875 fetch: match limit.fetch() {
1876 Some(n) => n as i64,
1877 _ => -1, },
1879 },
1880 ))),
1881 })
1882 }
1883
1884 fn try_from_local_limit_exec(
1885 limit: &LocalLimitExec,
1886 extension_codec: &dyn PhysicalExtensionCodec,
1887 ) -> Result<Self> {
1888 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1889 limit.input().to_owned(),
1890 extension_codec,
1891 )?;
1892 Ok(protobuf::PhysicalPlanNode {
1893 physical_plan_type: Some(PhysicalPlanType::LocalLimit(Box::new(
1894 protobuf::LocalLimitExecNode {
1895 input: Some(Box::new(input)),
1896 fetch: limit.fetch() as u32,
1897 },
1898 ))),
1899 })
1900 }
1901
1902 fn try_from_hash_join_exec(
1903 exec: &HashJoinExec,
1904 extension_codec: &dyn PhysicalExtensionCodec,
1905 ) -> Result<Self> {
1906 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
1907 exec.left().to_owned(),
1908 extension_codec,
1909 )?;
1910 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
1911 exec.right().to_owned(),
1912 extension_codec,
1913 )?;
1914 let on: Vec<protobuf::JoinOn> = exec
1915 .on()
1916 .iter()
1917 .map(|tuple| {
1918 let l = serialize_physical_expr(&tuple.0, extension_codec)?;
1919 let r = serialize_physical_expr(&tuple.1, extension_codec)?;
1920 Ok::<_, DataFusionError>(protobuf::JoinOn {
1921 left: Some(l),
1922 right: Some(r),
1923 })
1924 })
1925 .collect::<Result<_>>()?;
1926 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
1927 let filter = exec
1928 .filter()
1929 .as_ref()
1930 .map(|f| {
1931 let expression =
1932 serialize_physical_expr(f.expression(), extension_codec)?;
1933 let column_indices = f
1934 .column_indices()
1935 .iter()
1936 .map(|i| {
1937 let side: protobuf::JoinSide = i.side.to_owned().into();
1938 protobuf::ColumnIndex {
1939 index: i.index as u32,
1940 side: side.into(),
1941 }
1942 })
1943 .collect();
1944 let schema = f.schema().as_ref().try_into()?;
1945 Ok(protobuf::JoinFilter {
1946 expression: Some(expression),
1947 column_indices,
1948 schema: Some(schema),
1949 })
1950 })
1951 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
1952
1953 let partition_mode = match exec.partition_mode() {
1954 PartitionMode::CollectLeft => protobuf::PartitionMode::CollectLeft,
1955 PartitionMode::Partitioned => protobuf::PartitionMode::Partitioned,
1956 PartitionMode::Auto => protobuf::PartitionMode::Auto,
1957 };
1958
1959 Ok(protobuf::PhysicalPlanNode {
1960 physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new(
1961 protobuf::HashJoinExecNode {
1962 left: Some(Box::new(left)),
1963 right: Some(Box::new(right)),
1964 on,
1965 join_type: join_type.into(),
1966 partition_mode: partition_mode.into(),
1967 null_equals_null: exec.null_equals_null(),
1968 filter,
1969 projection: exec.projection.as_ref().map_or_else(Vec::new, |v| {
1970 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
1971 }),
1972 },
1973 ))),
1974 })
1975 }
1976
1977 fn try_from_symmetric_hash_join_exec(
1978 exec: &SymmetricHashJoinExec,
1979 extension_codec: &dyn PhysicalExtensionCodec,
1980 ) -> Result<Self> {
1981 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
1982 exec.left().to_owned(),
1983 extension_codec,
1984 )?;
1985 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
1986 exec.right().to_owned(),
1987 extension_codec,
1988 )?;
1989 let on = exec
1990 .on()
1991 .iter()
1992 .map(|tuple| {
1993 let l = serialize_physical_expr(&tuple.0, extension_codec)?;
1994 let r = serialize_physical_expr(&tuple.1, extension_codec)?;
1995 Ok::<_, DataFusionError>(protobuf::JoinOn {
1996 left: Some(l),
1997 right: Some(r),
1998 })
1999 })
2000 .collect::<Result<_>>()?;
2001 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2002 let filter = exec
2003 .filter()
2004 .as_ref()
2005 .map(|f| {
2006 let expression =
2007 serialize_physical_expr(f.expression(), extension_codec)?;
2008 let column_indices = f
2009 .column_indices()
2010 .iter()
2011 .map(|i| {
2012 let side: protobuf::JoinSide = i.side.to_owned().into();
2013 protobuf::ColumnIndex {
2014 index: i.index as u32,
2015 side: side.into(),
2016 }
2017 })
2018 .collect();
2019 let schema = f.schema().as_ref().try_into()?;
2020 Ok(protobuf::JoinFilter {
2021 expression: Some(expression),
2022 column_indices,
2023 schema: Some(schema),
2024 })
2025 })
2026 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2027
2028 let partition_mode = match exec.partition_mode() {
2029 StreamJoinPartitionMode::SinglePartition => {
2030 protobuf::StreamPartitionMode::SinglePartition
2031 }
2032 StreamJoinPartitionMode::Partitioned => {
2033 protobuf::StreamPartitionMode::PartitionedExec
2034 }
2035 };
2036
2037 let left_sort_exprs = exec
2038 .left_sort_exprs()
2039 .map(|exprs| {
2040 exprs
2041 .iter()
2042 .map(|expr| {
2043 Ok(protobuf::PhysicalSortExprNode {
2044 expr: Some(Box::new(serialize_physical_expr(
2045 &expr.expr,
2046 extension_codec,
2047 )?)),
2048 asc: !expr.options.descending,
2049 nulls_first: expr.options.nulls_first,
2050 })
2051 })
2052 .collect::<Result<Vec<_>>>()
2053 })
2054 .transpose()?
2055 .unwrap_or(vec![]);
2056
2057 let right_sort_exprs = exec
2058 .right_sort_exprs()
2059 .map(|exprs| {
2060 exprs
2061 .iter()
2062 .map(|expr| {
2063 Ok(protobuf::PhysicalSortExprNode {
2064 expr: Some(Box::new(serialize_physical_expr(
2065 &expr.expr,
2066 extension_codec,
2067 )?)),
2068 asc: !expr.options.descending,
2069 nulls_first: expr.options.nulls_first,
2070 })
2071 })
2072 .collect::<Result<Vec<_>>>()
2073 })
2074 .transpose()?
2075 .unwrap_or(vec![]);
2076
2077 Ok(protobuf::PhysicalPlanNode {
2078 physical_plan_type: Some(PhysicalPlanType::SymmetricHashJoin(Box::new(
2079 protobuf::SymmetricHashJoinExecNode {
2080 left: Some(Box::new(left)),
2081 right: Some(Box::new(right)),
2082 on,
2083 join_type: join_type.into(),
2084 partition_mode: partition_mode.into(),
2085 null_equals_null: exec.null_equals_null(),
2086 left_sort_exprs,
2087 right_sort_exprs,
2088 filter,
2089 },
2090 ))),
2091 })
2092 }
2093
2094 fn try_from_cross_join_exec(
2095 exec: &CrossJoinExec,
2096 extension_codec: &dyn PhysicalExtensionCodec,
2097 ) -> Result<Self> {
2098 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2099 exec.left().to_owned(),
2100 extension_codec,
2101 )?;
2102 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2103 exec.right().to_owned(),
2104 extension_codec,
2105 )?;
2106 Ok(protobuf::PhysicalPlanNode {
2107 physical_plan_type: Some(PhysicalPlanType::CrossJoin(Box::new(
2108 protobuf::CrossJoinExecNode {
2109 left: Some(Box::new(left)),
2110 right: Some(Box::new(right)),
2111 },
2112 ))),
2113 })
2114 }
2115
2116 fn try_from_aggregate_exec(
2117 exec: &AggregateExec,
2118 extension_codec: &dyn PhysicalExtensionCodec,
2119 ) -> Result<Self> {
2120 let groups: Vec<bool> = exec
2121 .group_expr()
2122 .groups()
2123 .iter()
2124 .flatten()
2125 .copied()
2126 .collect();
2127
2128 let group_names = exec
2129 .group_expr()
2130 .expr()
2131 .iter()
2132 .map(|expr| expr.1.to_owned())
2133 .collect();
2134
2135 let filter = exec
2136 .filter_expr()
2137 .iter()
2138 .map(|expr| serialize_maybe_filter(expr.to_owned(), extension_codec))
2139 .collect::<Result<Vec<_>>>()?;
2140
2141 let agg = exec
2142 .aggr_expr()
2143 .iter()
2144 .map(|expr| serialize_physical_aggr_expr(expr.to_owned(), extension_codec))
2145 .collect::<Result<Vec<_>>>()?;
2146
2147 let agg_names = exec
2148 .aggr_expr()
2149 .iter()
2150 .map(|expr| expr.name().to_string())
2151 .collect::<Vec<_>>();
2152
2153 let agg_mode = match exec.mode() {
2154 AggregateMode::Partial => protobuf::AggregateMode::Partial,
2155 AggregateMode::Final => protobuf::AggregateMode::Final,
2156 AggregateMode::FinalPartitioned => protobuf::AggregateMode::FinalPartitioned,
2157 AggregateMode::Single => protobuf::AggregateMode::Single,
2158 AggregateMode::SinglePartitioned => {
2159 protobuf::AggregateMode::SinglePartitioned
2160 }
2161 };
2162 let input_schema = exec.input_schema();
2163 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2164 exec.input().to_owned(),
2165 extension_codec,
2166 )?;
2167
2168 let null_expr = exec
2169 .group_expr()
2170 .null_expr()
2171 .iter()
2172 .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
2173 .collect::<Result<Vec<_>>>()?;
2174
2175 let group_expr = exec
2176 .group_expr()
2177 .expr()
2178 .iter()
2179 .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
2180 .collect::<Result<Vec<_>>>()?;
2181
2182 let limit = exec.limit().map(|value| protobuf::AggLimit {
2183 limit: value as u64,
2184 });
2185
2186 Ok(protobuf::PhysicalPlanNode {
2187 physical_plan_type: Some(PhysicalPlanType::Aggregate(Box::new(
2188 protobuf::AggregateExecNode {
2189 group_expr,
2190 group_expr_name: group_names,
2191 aggr_expr: agg,
2192 filter_expr: filter,
2193 aggr_expr_name: agg_names,
2194 mode: agg_mode as i32,
2195 input: Some(Box::new(input)),
2196 input_schema: Some(input_schema.as_ref().try_into()?),
2197 null_expr,
2198 groups,
2199 limit,
2200 },
2201 ))),
2202 })
2203 }
2204
2205 fn try_from_empty_exec(
2206 empty: &EmptyExec,
2207 _extension_codec: &dyn PhysicalExtensionCodec,
2208 ) -> Result<Self> {
2209 let schema = empty.schema().as_ref().try_into()?;
2210 Ok(protobuf::PhysicalPlanNode {
2211 physical_plan_type: Some(PhysicalPlanType::Empty(protobuf::EmptyExecNode {
2212 schema: Some(schema),
2213 })),
2214 })
2215 }
2216
2217 fn try_from_placeholder_row_exec(
2218 empty: &PlaceholderRowExec,
2219 _extension_codec: &dyn PhysicalExtensionCodec,
2220 ) -> Result<Self> {
2221 let schema = empty.schema().as_ref().try_into()?;
2222 Ok(protobuf::PhysicalPlanNode {
2223 physical_plan_type: Some(PhysicalPlanType::PlaceholderRow(
2224 protobuf::PlaceholderRowExecNode {
2225 schema: Some(schema),
2226 },
2227 )),
2228 })
2229 }
2230
2231 fn try_from_coalesce_batches_exec(
2232 coalesce_batches: &CoalesceBatchesExec,
2233 extension_codec: &dyn PhysicalExtensionCodec,
2234 ) -> Result<Self> {
2235 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2236 coalesce_batches.input().to_owned(),
2237 extension_codec,
2238 )?;
2239 Ok(protobuf::PhysicalPlanNode {
2240 physical_plan_type: Some(PhysicalPlanType::CoalesceBatches(Box::new(
2241 protobuf::CoalesceBatchesExecNode {
2242 input: Some(Box::new(input)),
2243 target_batch_size: coalesce_batches.target_batch_size() as u32,
2244 fetch: coalesce_batches.fetch().map(|n| n as u32),
2245 },
2246 ))),
2247 })
2248 }
2249
2250 fn try_from_data_source_exec(
2251 data_source_exec: &DataSourceExec,
2252 extension_codec: &dyn PhysicalExtensionCodec,
2253 ) -> Result<Option<Self>> {
2254 let data_source = data_source_exec.data_source();
2255 if let Some(maybe_csv) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2256 let source = maybe_csv.file_source();
2257 if let Some(csv_config) = source.as_any().downcast_ref::<CsvSource>() {
2258 return Ok(Some(protobuf::PhysicalPlanNode {
2259 physical_plan_type: Some(PhysicalPlanType::CsvScan(
2260 protobuf::CsvScanExecNode {
2261 base_conf: Some(serialize_file_scan_config(
2262 maybe_csv,
2263 extension_codec,
2264 )?),
2265 has_header: csv_config.has_header(),
2266 delimiter: byte_to_string(
2267 csv_config.delimiter(),
2268 "delimiter",
2269 )?,
2270 quote: byte_to_string(csv_config.quote(), "quote")?,
2271 optional_escape: if let Some(escape) = csv_config.escape() {
2272 Some(
2273 protobuf::csv_scan_exec_node::OptionalEscape::Escape(
2274 byte_to_string(escape, "escape")?,
2275 ),
2276 )
2277 } else {
2278 None
2279 },
2280 optional_comment: if let Some(comment) = csv_config.comment()
2281 {
2282 Some(protobuf::csv_scan_exec_node::OptionalComment::Comment(
2283 byte_to_string(comment, "comment")?,
2284 ))
2285 } else {
2286 None
2287 },
2288 newlines_in_values: maybe_csv.newlines_in_values(),
2289 },
2290 )),
2291 }));
2292 }
2293 }
2294
2295 if let Some(scan_conf) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2296 let source = scan_conf.file_source();
2297 if let Some(_json_source) = source.as_any().downcast_ref::<JsonSource>() {
2298 return Ok(Some(protobuf::PhysicalPlanNode {
2299 physical_plan_type: Some(PhysicalPlanType::JsonScan(
2300 protobuf::JsonScanExecNode {
2301 base_conf: Some(serialize_file_scan_config(
2302 scan_conf,
2303 extension_codec,
2304 )?),
2305 },
2306 )),
2307 }));
2308 }
2309 }
2310
2311 #[cfg(feature = "parquet")]
2312 if let Some((maybe_parquet, conf)) =
2313 data_source_exec.downcast_to_file_source::<ParquetSource>()
2314 {
2315 let predicate = conf
2316 .predicate()
2317 .map(|pred| serialize_physical_expr(pred, extension_codec))
2318 .transpose()?;
2319 return Ok(Some(protobuf::PhysicalPlanNode {
2320 physical_plan_type: Some(PhysicalPlanType::ParquetScan(
2321 protobuf::ParquetScanExecNode {
2322 base_conf: Some(serialize_file_scan_config(
2323 maybe_parquet,
2324 extension_codec,
2325 )?),
2326 predicate,
2327 parquet_options: Some(conf.table_parquet_options().try_into()?),
2328 },
2329 )),
2330 }));
2331 }
2332
2333 #[cfg(feature = "avro")]
2334 if let Some(maybe_avro) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2335 let source = maybe_avro.file_source();
2336 if source.as_any().downcast_ref::<AvroSource>().is_some() {
2337 return Ok(Some(protobuf::PhysicalPlanNode {
2338 physical_plan_type: Some(PhysicalPlanType::AvroScan(
2339 protobuf::AvroScanExecNode {
2340 base_conf: Some(serialize_file_scan_config(
2341 maybe_avro,
2342 extension_codec,
2343 )?),
2344 },
2345 )),
2346 }));
2347 }
2348 }
2349
2350 Ok(None)
2351 }
2352
2353 fn try_from_coalesce_partitions_exec(
2354 exec: &CoalescePartitionsExec,
2355 extension_codec: &dyn PhysicalExtensionCodec,
2356 ) -> Result<Self> {
2357 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2358 exec.input().to_owned(),
2359 extension_codec,
2360 )?;
2361 Ok(protobuf::PhysicalPlanNode {
2362 physical_plan_type: Some(PhysicalPlanType::Merge(Box::new(
2363 protobuf::CoalescePartitionsExecNode {
2364 input: Some(Box::new(input)),
2365 fetch: exec.fetch().map(|f| f as u32),
2366 },
2367 ))),
2368 })
2369 }
2370
2371 fn try_from_repartition_exec(
2372 exec: &RepartitionExec,
2373 extension_codec: &dyn PhysicalExtensionCodec,
2374 ) -> Result<Self> {
2375 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2376 exec.input().to_owned(),
2377 extension_codec,
2378 )?;
2379
2380 let pb_partitioning =
2381 serialize_partitioning(exec.partitioning(), extension_codec)?;
2382
2383 Ok(protobuf::PhysicalPlanNode {
2384 physical_plan_type: Some(PhysicalPlanType::Repartition(Box::new(
2385 protobuf::RepartitionExecNode {
2386 input: Some(Box::new(input)),
2387 partitioning: Some(pb_partitioning),
2388 },
2389 ))),
2390 })
2391 }
2392
2393 fn try_from_sort_exec(
2394 exec: &SortExec,
2395 extension_codec: &dyn PhysicalExtensionCodec,
2396 ) -> Result<Self> {
2397 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2398 exec.input().to_owned(),
2399 extension_codec,
2400 )?;
2401 let expr = exec
2402 .expr()
2403 .iter()
2404 .map(|expr| {
2405 let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
2406 expr: Some(Box::new(serialize_physical_expr(
2407 &expr.expr,
2408 extension_codec,
2409 )?)),
2410 asc: !expr.options.descending,
2411 nulls_first: expr.options.nulls_first,
2412 });
2413 Ok(protobuf::PhysicalExprNode {
2414 expr_type: Some(ExprType::Sort(sort_expr)),
2415 })
2416 })
2417 .collect::<Result<Vec<_>>>()?;
2418 Ok(protobuf::PhysicalPlanNode {
2419 physical_plan_type: Some(PhysicalPlanType::Sort(Box::new(
2420 protobuf::SortExecNode {
2421 input: Some(Box::new(input)),
2422 expr,
2423 fetch: match exec.fetch() {
2424 Some(n) => n as i64,
2425 _ => -1,
2426 },
2427 preserve_partitioning: exec.preserve_partitioning(),
2428 },
2429 ))),
2430 })
2431 }
2432
2433 fn try_from_union_exec(
2434 union: &UnionExec,
2435 extension_codec: &dyn PhysicalExtensionCodec,
2436 ) -> Result<Self> {
2437 let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
2438 for input in union.inputs() {
2439 inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
2440 input.to_owned(),
2441 extension_codec,
2442 )?);
2443 }
2444 Ok(protobuf::PhysicalPlanNode {
2445 physical_plan_type: Some(PhysicalPlanType::Union(protobuf::UnionExecNode {
2446 inputs,
2447 })),
2448 })
2449 }
2450
2451 fn try_from_interleave_exec(
2452 interleave: &InterleaveExec,
2453 extension_codec: &dyn PhysicalExtensionCodec,
2454 ) -> Result<Self> {
2455 let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
2456 for input in interleave.inputs() {
2457 inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
2458 input.to_owned(),
2459 extension_codec,
2460 )?);
2461 }
2462 Ok(protobuf::PhysicalPlanNode {
2463 physical_plan_type: Some(PhysicalPlanType::Interleave(
2464 protobuf::InterleaveExecNode { inputs },
2465 )),
2466 })
2467 }
2468
2469 fn try_from_sort_preserving_merge_exec(
2470 exec: &SortPreservingMergeExec,
2471 extension_codec: &dyn PhysicalExtensionCodec,
2472 ) -> Result<Self> {
2473 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2474 exec.input().to_owned(),
2475 extension_codec,
2476 )?;
2477 let expr = exec
2478 .expr()
2479 .iter()
2480 .map(|expr| {
2481 let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
2482 expr: Some(Box::new(serialize_physical_expr(
2483 &expr.expr,
2484 extension_codec,
2485 )?)),
2486 asc: !expr.options.descending,
2487 nulls_first: expr.options.nulls_first,
2488 });
2489 Ok(protobuf::PhysicalExprNode {
2490 expr_type: Some(ExprType::Sort(sort_expr)),
2491 })
2492 })
2493 .collect::<Result<Vec<_>>>()?;
2494 Ok(protobuf::PhysicalPlanNode {
2495 physical_plan_type: Some(PhysicalPlanType::SortPreservingMerge(Box::new(
2496 protobuf::SortPreservingMergeExecNode {
2497 input: Some(Box::new(input)),
2498 expr,
2499 fetch: exec.fetch().map(|f| f as i64).unwrap_or(-1),
2500 },
2501 ))),
2502 })
2503 }
2504
2505 fn try_from_nested_loop_join_exec(
2506 exec: &NestedLoopJoinExec,
2507 extension_codec: &dyn PhysicalExtensionCodec,
2508 ) -> Result<Self> {
2509 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2510 exec.left().to_owned(),
2511 extension_codec,
2512 )?;
2513 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2514 exec.right().to_owned(),
2515 extension_codec,
2516 )?;
2517
2518 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2519 let filter = exec
2520 .filter()
2521 .as_ref()
2522 .map(|f| {
2523 let expression =
2524 serialize_physical_expr(f.expression(), extension_codec)?;
2525 let column_indices = f
2526 .column_indices()
2527 .iter()
2528 .map(|i| {
2529 let side: protobuf::JoinSide = i.side.to_owned().into();
2530 protobuf::ColumnIndex {
2531 index: i.index as u32,
2532 side: side.into(),
2533 }
2534 })
2535 .collect();
2536 let schema = f.schema().as_ref().try_into()?;
2537 Ok(protobuf::JoinFilter {
2538 expression: Some(expression),
2539 column_indices,
2540 schema: Some(schema),
2541 })
2542 })
2543 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2544
2545 Ok(protobuf::PhysicalPlanNode {
2546 physical_plan_type: Some(PhysicalPlanType::NestedLoopJoin(Box::new(
2547 protobuf::NestedLoopJoinExecNode {
2548 left: Some(Box::new(left)),
2549 right: Some(Box::new(right)),
2550 join_type: join_type.into(),
2551 filter,
2552 projection: exec.projection().map_or_else(Vec::new, |v| {
2553 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2554 }),
2555 },
2556 ))),
2557 })
2558 }
2559
2560 fn try_from_window_agg_exec(
2561 exec: &WindowAggExec,
2562 extension_codec: &dyn PhysicalExtensionCodec,
2563 ) -> Result<Self> {
2564 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2565 exec.input().to_owned(),
2566 extension_codec,
2567 )?;
2568
2569 let window_expr = exec
2570 .window_expr()
2571 .iter()
2572 .map(|e| serialize_physical_window_expr(e, extension_codec))
2573 .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
2574
2575 let partition_keys = exec
2576 .partition_keys()
2577 .iter()
2578 .map(|e| serialize_physical_expr(e, extension_codec))
2579 .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
2580
2581 Ok(protobuf::PhysicalPlanNode {
2582 physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
2583 protobuf::WindowAggExecNode {
2584 input: Some(Box::new(input)),
2585 window_expr,
2586 partition_keys,
2587 input_order_mode: None,
2588 },
2589 ))),
2590 })
2591 }
2592
2593 fn try_from_bounded_window_agg_exec(
2594 exec: &BoundedWindowAggExec,
2595 extension_codec: &dyn PhysicalExtensionCodec,
2596 ) -> Result<Self> {
2597 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2598 exec.input().to_owned(),
2599 extension_codec,
2600 )?;
2601
2602 let window_expr = exec
2603 .window_expr()
2604 .iter()
2605 .map(|e| serialize_physical_window_expr(e, extension_codec))
2606 .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
2607
2608 let partition_keys = exec
2609 .partition_keys()
2610 .iter()
2611 .map(|e| serialize_physical_expr(e, extension_codec))
2612 .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
2613
2614 let input_order_mode = match &exec.input_order_mode {
2615 InputOrderMode::Linear => {
2616 window_agg_exec_node::InputOrderMode::Linear(protobuf::EmptyMessage {})
2617 }
2618 InputOrderMode::PartiallySorted(columns) => {
2619 window_agg_exec_node::InputOrderMode::PartiallySorted(
2620 protobuf::PartiallySortedInputOrderMode {
2621 columns: columns.iter().map(|c| *c as u64).collect(),
2622 },
2623 )
2624 }
2625 InputOrderMode::Sorted => {
2626 window_agg_exec_node::InputOrderMode::Sorted(protobuf::EmptyMessage {})
2627 }
2628 };
2629
2630 Ok(protobuf::PhysicalPlanNode {
2631 physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
2632 protobuf::WindowAggExecNode {
2633 input: Some(Box::new(input)),
2634 window_expr,
2635 partition_keys,
2636 input_order_mode: Some(input_order_mode),
2637 },
2638 ))),
2639 })
2640 }
2641
2642 fn try_from_data_sink_exec(
2643 exec: &DataSinkExec,
2644 extension_codec: &dyn PhysicalExtensionCodec,
2645 ) -> Result<Option<Self>> {
2646 let input: protobuf::PhysicalPlanNode =
2647 protobuf::PhysicalPlanNode::try_from_physical_plan(
2648 exec.input().to_owned(),
2649 extension_codec,
2650 )?;
2651 let sort_order = match exec.sort_order() {
2652 Some(requirements) => {
2653 let expr = requirements
2654 .iter()
2655 .map(|requirement| {
2656 let expr: PhysicalSortExpr = requirement.to_owned().into();
2657 let sort_expr = protobuf::PhysicalSortExprNode {
2658 expr: Some(Box::new(serialize_physical_expr(
2659 &expr.expr,
2660 extension_codec,
2661 )?)),
2662 asc: !expr.options.descending,
2663 nulls_first: expr.options.nulls_first,
2664 };
2665 Ok(sort_expr)
2666 })
2667 .collect::<Result<Vec<_>>>()?;
2668 Some(protobuf::PhysicalSortExprNodeCollection {
2669 physical_sort_expr_nodes: expr,
2670 })
2671 }
2672 None => None,
2673 };
2674
2675 if let Some(sink) = exec.sink().as_any().downcast_ref::<JsonSink>() {
2676 return Ok(Some(protobuf::PhysicalPlanNode {
2677 physical_plan_type: Some(PhysicalPlanType::JsonSink(Box::new(
2678 protobuf::JsonSinkExecNode {
2679 input: Some(Box::new(input)),
2680 sink: Some(sink.try_into()?),
2681 sink_schema: Some(exec.schema().as_ref().try_into()?),
2682 sort_order,
2683 },
2684 ))),
2685 }));
2686 }
2687
2688 if let Some(sink) = exec.sink().as_any().downcast_ref::<CsvSink>() {
2689 return Ok(Some(protobuf::PhysicalPlanNode {
2690 physical_plan_type: Some(PhysicalPlanType::CsvSink(Box::new(
2691 protobuf::CsvSinkExecNode {
2692 input: Some(Box::new(input)),
2693 sink: Some(sink.try_into()?),
2694 sink_schema: Some(exec.schema().as_ref().try_into()?),
2695 sort_order,
2696 },
2697 ))),
2698 }));
2699 }
2700
2701 #[cfg(feature = "parquet")]
2702 if let Some(sink) = exec.sink().as_any().downcast_ref::<ParquetSink>() {
2703 return Ok(Some(protobuf::PhysicalPlanNode {
2704 physical_plan_type: Some(PhysicalPlanType::ParquetSink(Box::new(
2705 protobuf::ParquetSinkExecNode {
2706 input: Some(Box::new(input)),
2707 sink: Some(sink.try_into()?),
2708 sink_schema: Some(exec.schema().as_ref().try_into()?),
2709 sort_order,
2710 },
2711 ))),
2712 }));
2713 }
2714
2715 Ok(None)
2717 }
2718
2719 fn try_from_unnest_exec(
2720 exec: &UnnestExec,
2721 extension_codec: &dyn PhysicalExtensionCodec,
2722 ) -> Result<Self> {
2723 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2724 exec.input().to_owned(),
2725 extension_codec,
2726 )?;
2727
2728 Ok(protobuf::PhysicalPlanNode {
2729 physical_plan_type: Some(PhysicalPlanType::Unnest(Box::new(
2730 protobuf::UnnestExecNode {
2731 input: Some(Box::new(input)),
2732 schema: Some(exec.schema().try_into()?),
2733 list_type_columns: exec
2734 .list_column_indices()
2735 .iter()
2736 .map(|c| ProtoListUnnest {
2737 index_in_input_schema: c.index_in_input_schema as _,
2738 depth: c.depth as _,
2739 })
2740 .collect(),
2741 struct_type_columns: exec
2742 .struct_column_indices()
2743 .iter()
2744 .map(|c| *c as _)
2745 .collect(),
2746 options: Some(exec.options().into()),
2747 },
2748 ))),
2749 })
2750 }
2751}
2752
2753pub trait AsExecutionPlan: Debug + Send + Sync + Clone {
2754 fn try_decode(buf: &[u8]) -> Result<Self>
2755 where
2756 Self: Sized;
2757
2758 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
2759 where
2760 B: BufMut,
2761 Self: Sized;
2762
2763 fn try_into_physical_plan(
2764 &self,
2765 registry: &dyn FunctionRegistry,
2766 runtime: &RuntimeEnv,
2767 extension_codec: &dyn PhysicalExtensionCodec,
2768 ) -> Result<Arc<dyn ExecutionPlan>>;
2769
2770 fn try_from_physical_plan(
2771 plan: Arc<dyn ExecutionPlan>,
2772 extension_codec: &dyn PhysicalExtensionCodec,
2773 ) -> Result<Self>
2774 where
2775 Self: Sized;
2776}
2777
2778pub trait PhysicalExtensionCodec: Debug + Send + Sync {
2779 fn try_decode(
2780 &self,
2781 buf: &[u8],
2782 inputs: &[Arc<dyn ExecutionPlan>],
2783 registry: &dyn FunctionRegistry,
2784 ) -> Result<Arc<dyn ExecutionPlan>>;
2785
2786 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()>;
2787
2788 fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
2789 not_impl_err!("PhysicalExtensionCodec is not provided for scalar function {name}")
2790 }
2791
2792 fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
2793 Ok(())
2794 }
2795
2796 fn try_decode_expr(
2797 &self,
2798 _buf: &[u8],
2799 _inputs: &[Arc<dyn PhysicalExpr>],
2800 ) -> Result<Arc<dyn PhysicalExpr>> {
2801 not_impl_err!("PhysicalExtensionCodec is not provided")
2802 }
2803
2804 fn try_encode_expr(
2805 &self,
2806 _node: &Arc<dyn PhysicalExpr>,
2807 _buf: &mut Vec<u8>,
2808 ) -> Result<()> {
2809 not_impl_err!("PhysicalExtensionCodec is not provided")
2810 }
2811
2812 fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
2813 not_impl_err!(
2814 "PhysicalExtensionCodec is not provided for aggregate function {name}"
2815 )
2816 }
2817
2818 fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
2819 Ok(())
2820 }
2821
2822 fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
2823 not_impl_err!("PhysicalExtensionCodec is not provided for window function {name}")
2824 }
2825
2826 fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
2827 Ok(())
2828 }
2829}
2830
2831#[derive(Debug)]
2832pub struct DefaultPhysicalExtensionCodec {}
2833
2834impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec {
2835 fn try_decode(
2836 &self,
2837 _buf: &[u8],
2838 _inputs: &[Arc<dyn ExecutionPlan>],
2839 _registry: &dyn FunctionRegistry,
2840 ) -> Result<Arc<dyn ExecutionPlan>> {
2841 not_impl_err!("PhysicalExtensionCodec is not provided")
2842 }
2843
2844 fn try_encode(
2845 &self,
2846 _node: Arc<dyn ExecutionPlan>,
2847 _buf: &mut Vec<u8>,
2848 ) -> Result<()> {
2849 not_impl_err!("PhysicalExtensionCodec is not provided")
2850 }
2851}
2852
2853fn into_physical_plan(
2854 node: &Option<Box<protobuf::PhysicalPlanNode>>,
2855 registry: &dyn FunctionRegistry,
2856 runtime: &RuntimeEnv,
2857 extension_codec: &dyn PhysicalExtensionCodec,
2858) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
2859 if let Some(field) = node {
2860 field.try_into_physical_plan(registry, runtime, extension_codec)
2861 } else {
2862 Err(proto_error("Missing required field in protobuf"))
2863 }
2864}