1use std::fmt::Debug;
19use std::sync::Arc;
20
21use self::from_proto::parse_protobuf_partitioning;
22use self::to_proto::{serialize_partitioning, serialize_physical_expr};
23use crate::common::{byte_to_string, str_to_byte};
24use crate::physical_plan::from_proto::{
25 parse_physical_expr, parse_physical_sort_expr, parse_physical_sort_exprs,
26 parse_physical_window_expr, parse_protobuf_file_scan_config,
27 parse_protobuf_file_scan_schema,
28};
29use crate::physical_plan::to_proto::{
30 serialize_file_scan_config, serialize_maybe_filter, serialize_physical_aggr_expr,
31 serialize_physical_window_expr,
32};
33use crate::protobuf::physical_aggregate_expr_node::AggregateFunction;
34use crate::protobuf::physical_expr_node::ExprType;
35use crate::protobuf::physical_plan_node::PhysicalPlanType;
36use crate::protobuf::{
37 self, proto_error, window_agg_exec_node, ListUnnest as ProtoListUnnest,
38};
39use crate::{convert_required, into_required};
40
41use datafusion::arrow::compute::SortOptions;
42use datafusion::arrow::datatypes::SchemaRef;
43use datafusion::datasource::file_format::csv::CsvSink;
44use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
45use datafusion::datasource::file_format::json::JsonSink;
46#[cfg(feature = "parquet")]
47use datafusion::datasource::file_format::parquet::ParquetSink;
48#[cfg(feature = "avro")]
49use datafusion::datasource::physical_plan::AvroSource;
50#[cfg(feature = "parquet")]
51use datafusion::datasource::physical_plan::ParquetSource;
52use datafusion::datasource::physical_plan::{
53 CsvSource, FileScanConfig, FileScanConfigBuilder, JsonSource,
54};
55use datafusion::datasource::sink::DataSinkExec;
56use datafusion::datasource::source::DataSourceExec;
57use datafusion::execution::runtime_env::RuntimeEnv;
58use datafusion::execution::FunctionRegistry;
59use datafusion::physical_expr::aggregate::AggregateExprBuilder;
60use datafusion::physical_expr::aggregate::AggregateFunctionExpr;
61use datafusion::physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef};
62use datafusion::physical_plan::aggregates::AggregateMode;
63use datafusion::physical_plan::aggregates::{AggregateExec, PhysicalGroupBy};
64use datafusion::physical_plan::analyze::AnalyzeExec;
65use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
66use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
67use datafusion::physical_plan::empty::EmptyExec;
68use datafusion::physical_plan::explain::ExplainExec;
69use datafusion::physical_plan::expressions::PhysicalSortExpr;
70use datafusion::physical_plan::filter::FilterExec;
71use datafusion::physical_plan::joins::utils::{ColumnIndex, JoinFilter};
72use datafusion::physical_plan::joins::{
73 CrossJoinExec, NestedLoopJoinExec, StreamJoinPartitionMode, SymmetricHashJoinExec,
74};
75use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
76use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
77use datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
78use datafusion::physical_plan::projection::ProjectionExec;
79use datafusion::physical_plan::repartition::RepartitionExec;
80use datafusion::physical_plan::sorts::sort::SortExec;
81use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
82use datafusion::physical_plan::union::{InterleaveExec, UnionExec};
83use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec};
84use datafusion::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
85use datafusion::physical_plan::{
86 ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr,
87};
88use datafusion_common::config::TableParquetOptions;
89use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
90use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
91
92use prost::bytes::BufMut;
93use prost::Message;
94
95pub mod from_proto;
96pub mod to_proto;
97
98impl AsExecutionPlan for protobuf::PhysicalPlanNode {
99 fn try_decode(buf: &[u8]) -> Result<Self>
100 where
101 Self: Sized,
102 {
103 protobuf::PhysicalPlanNode::decode(buf).map_err(|e| {
104 DataFusionError::Internal(format!("failed to decode physical plan: {e:?}"))
105 })
106 }
107
108 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
109 where
110 B: BufMut,
111 Self: Sized,
112 {
113 self.encode(buf).map_err(|e| {
114 DataFusionError::Internal(format!("failed to encode physical plan: {e:?}"))
115 })
116 }
117
118 fn try_into_physical_plan(
119 &self,
120 registry: &dyn FunctionRegistry,
121 runtime: &RuntimeEnv,
122 extension_codec: &dyn PhysicalExtensionCodec,
123 ) -> Result<Arc<dyn ExecutionPlan>> {
124 let plan = self.physical_plan_type.as_ref().ok_or_else(|| {
125 proto_error(format!(
126 "physical_plan::from_proto() Unsupported physical plan '{self:?}'"
127 ))
128 })?;
129 match plan {
130 PhysicalPlanType::Explain(explain) => self.try_into_explain_physical_plan(
131 explain,
132 registry,
133 runtime,
134 extension_codec,
135 ),
136 PhysicalPlanType::Projection(projection) => self
137 .try_into_projection_physical_plan(
138 projection,
139 registry,
140 runtime,
141 extension_codec,
142 ),
143 PhysicalPlanType::Filter(filter) => self.try_into_filter_physical_plan(
144 filter,
145 registry,
146 runtime,
147 extension_codec,
148 ),
149 PhysicalPlanType::CsvScan(scan) => self.try_into_csv_scan_physical_plan(
150 scan,
151 registry,
152 runtime,
153 extension_codec,
154 ),
155 PhysicalPlanType::JsonScan(scan) => self.try_into_json_scan_physical_plan(
156 scan,
157 registry,
158 runtime,
159 extension_codec,
160 ),
161 #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
162 PhysicalPlanType::ParquetScan(scan) => self
163 .try_into_parquet_scan_physical_plan(
164 scan,
165 registry,
166 runtime,
167 extension_codec,
168 ),
169 #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
170 PhysicalPlanType::AvroScan(scan) => self.try_into_avro_scan_physical_plan(
171 scan,
172 registry,
173 runtime,
174 extension_codec,
175 ),
176 PhysicalPlanType::CoalesceBatches(coalesce_batches) => self
177 .try_into_coalesce_batches_physical_plan(
178 coalesce_batches,
179 registry,
180 runtime,
181 extension_codec,
182 ),
183 PhysicalPlanType::Merge(merge) => self.try_into_merge_physical_plan(
184 merge,
185 registry,
186 runtime,
187 extension_codec,
188 ),
189 PhysicalPlanType::Repartition(repart) => self
190 .try_into_repartition_physical_plan(
191 repart,
192 registry,
193 runtime,
194 extension_codec,
195 ),
196 PhysicalPlanType::GlobalLimit(limit) => self
197 .try_into_global_limit_physical_plan(
198 limit,
199 registry,
200 runtime,
201 extension_codec,
202 ),
203 PhysicalPlanType::LocalLimit(limit) => self
204 .try_into_local_limit_physical_plan(
205 limit,
206 registry,
207 runtime,
208 extension_codec,
209 ),
210 PhysicalPlanType::Window(window_agg) => self.try_into_window_physical_plan(
211 window_agg,
212 registry,
213 runtime,
214 extension_codec,
215 ),
216 PhysicalPlanType::Aggregate(hash_agg) => self
217 .try_into_aggregate_physical_plan(
218 hash_agg,
219 registry,
220 runtime,
221 extension_codec,
222 ),
223 PhysicalPlanType::HashJoin(hashjoin) => self
224 .try_into_hash_join_physical_plan(
225 hashjoin,
226 registry,
227 runtime,
228 extension_codec,
229 ),
230 PhysicalPlanType::SymmetricHashJoin(sym_join) => self
231 .try_into_symmetric_hash_join_physical_plan(
232 sym_join,
233 registry,
234 runtime,
235 extension_codec,
236 ),
237 PhysicalPlanType::Union(union) => self.try_into_union_physical_plan(
238 union,
239 registry,
240 runtime,
241 extension_codec,
242 ),
243 PhysicalPlanType::Interleave(interleave) => self
244 .try_into_interleave_physical_plan(
245 interleave,
246 registry,
247 runtime,
248 extension_codec,
249 ),
250 PhysicalPlanType::CrossJoin(crossjoin) => self
251 .try_into_cross_join_physical_plan(
252 crossjoin,
253 registry,
254 runtime,
255 extension_codec,
256 ),
257 PhysicalPlanType::Empty(empty) => self.try_into_empty_physical_plan(
258 empty,
259 registry,
260 runtime,
261 extension_codec,
262 ),
263 PhysicalPlanType::PlaceholderRow(placeholder) => self
264 .try_into_placeholder_row_physical_plan(
265 placeholder,
266 registry,
267 runtime,
268 extension_codec,
269 ),
270 PhysicalPlanType::Sort(sort) => {
271 self.try_into_sort_physical_plan(sort, registry, runtime, extension_codec)
272 }
273 PhysicalPlanType::SortPreservingMerge(sort) => self
274 .try_into_sort_preserving_merge_physical_plan(
275 sort,
276 registry,
277 runtime,
278 extension_codec,
279 ),
280 PhysicalPlanType::Extension(extension) => self
281 .try_into_extension_physical_plan(
282 extension,
283 registry,
284 runtime,
285 extension_codec,
286 ),
287 PhysicalPlanType::NestedLoopJoin(join) => self
288 .try_into_nested_loop_join_physical_plan(
289 join,
290 registry,
291 runtime,
292 extension_codec,
293 ),
294 PhysicalPlanType::Analyze(analyze) => self.try_into_analyze_physical_plan(
295 analyze,
296 registry,
297 runtime,
298 extension_codec,
299 ),
300 PhysicalPlanType::JsonSink(sink) => self.try_into_json_sink_physical_plan(
301 sink,
302 registry,
303 runtime,
304 extension_codec,
305 ),
306 PhysicalPlanType::CsvSink(sink) => self.try_into_csv_sink_physical_plan(
307 sink,
308 registry,
309 runtime,
310 extension_codec,
311 ),
312
313 #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
314 PhysicalPlanType::ParquetSink(sink) => self
315 .try_into_parquet_sink_physical_plan(
316 sink,
317 registry,
318 runtime,
319 extension_codec,
320 ),
321 PhysicalPlanType::Unnest(unnest) => self.try_into_unnest_physical_plan(
322 unnest,
323 registry,
324 runtime,
325 extension_codec,
326 ),
327 }
328 }
329
330 fn try_from_physical_plan(
331 plan: Arc<dyn ExecutionPlan>,
332 extension_codec: &dyn PhysicalExtensionCodec,
333 ) -> Result<Self>
334 where
335 Self: Sized,
336 {
337 let plan_clone = Arc::clone(&plan);
338 let plan = plan.as_any();
339
340 if let Some(exec) = plan.downcast_ref::<ExplainExec>() {
341 return protobuf::PhysicalPlanNode::try_from_explain_exec(
342 exec,
343 extension_codec,
344 );
345 }
346
347 if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
348 return protobuf::PhysicalPlanNode::try_from_projection_exec(
349 exec,
350 extension_codec,
351 );
352 }
353
354 if let Some(exec) = plan.downcast_ref::<AnalyzeExec>() {
355 return protobuf::PhysicalPlanNode::try_from_analyze_exec(
356 exec,
357 extension_codec,
358 );
359 }
360
361 if let Some(exec) = plan.downcast_ref::<FilterExec>() {
362 return protobuf::PhysicalPlanNode::try_from_filter_exec(
363 exec,
364 extension_codec,
365 );
366 }
367
368 if let Some(limit) = plan.downcast_ref::<GlobalLimitExec>() {
369 return protobuf::PhysicalPlanNode::try_from_global_limit_exec(
370 limit,
371 extension_codec,
372 );
373 }
374
375 if let Some(limit) = plan.downcast_ref::<LocalLimitExec>() {
376 return protobuf::PhysicalPlanNode::try_from_local_limit_exec(
377 limit,
378 extension_codec,
379 );
380 }
381
382 if let Some(exec) = plan.downcast_ref::<HashJoinExec>() {
383 return protobuf::PhysicalPlanNode::try_from_hash_join_exec(
384 exec,
385 extension_codec,
386 );
387 }
388
389 if let Some(exec) = plan.downcast_ref::<SymmetricHashJoinExec>() {
390 return protobuf::PhysicalPlanNode::try_from_symmetric_hash_join_exec(
391 exec,
392 extension_codec,
393 );
394 }
395
396 if let Some(exec) = plan.downcast_ref::<CrossJoinExec>() {
397 return protobuf::PhysicalPlanNode::try_from_cross_join_exec(
398 exec,
399 extension_codec,
400 );
401 }
402
403 if let Some(exec) = plan.downcast_ref::<AggregateExec>() {
404 return protobuf::PhysicalPlanNode::try_from_aggregate_exec(
405 exec,
406 extension_codec,
407 );
408 }
409
410 if let Some(empty) = plan.downcast_ref::<EmptyExec>() {
411 return protobuf::PhysicalPlanNode::try_from_empty_exec(
412 empty,
413 extension_codec,
414 );
415 }
416
417 if let Some(empty) = plan.downcast_ref::<PlaceholderRowExec>() {
418 return protobuf::PhysicalPlanNode::try_from_placeholder_row_exec(
419 empty,
420 extension_codec,
421 );
422 }
423
424 if let Some(coalesce_batches) = plan.downcast_ref::<CoalesceBatchesExec>() {
425 return protobuf::PhysicalPlanNode::try_from_coalesce_batches_exec(
426 coalesce_batches,
427 extension_codec,
428 );
429 }
430
431 if let Some(data_source_exec) = plan.downcast_ref::<DataSourceExec>() {
432 if let Some(node) = protobuf::PhysicalPlanNode::try_from_data_source_exec(
433 data_source_exec,
434 extension_codec,
435 )? {
436 return Ok(node);
437 }
438 }
439
440 if let Some(exec) = plan.downcast_ref::<CoalescePartitionsExec>() {
441 return protobuf::PhysicalPlanNode::try_from_coalesce_partitions_exec(
442 exec,
443 extension_codec,
444 );
445 }
446
447 if let Some(exec) = plan.downcast_ref::<RepartitionExec>() {
448 return protobuf::PhysicalPlanNode::try_from_repartition_exec(
449 exec,
450 extension_codec,
451 );
452 }
453
454 if let Some(exec) = plan.downcast_ref::<SortExec>() {
455 return protobuf::PhysicalPlanNode::try_from_sort_exec(exec, extension_codec);
456 }
457
458 if let Some(union) = plan.downcast_ref::<UnionExec>() {
459 return protobuf::PhysicalPlanNode::try_from_union_exec(
460 union,
461 extension_codec,
462 );
463 }
464
465 if let Some(interleave) = plan.downcast_ref::<InterleaveExec>() {
466 return protobuf::PhysicalPlanNode::try_from_interleave_exec(
467 interleave,
468 extension_codec,
469 );
470 }
471
472 if let Some(exec) = plan.downcast_ref::<SortPreservingMergeExec>() {
473 return protobuf::PhysicalPlanNode::try_from_sort_preserving_merge_exec(
474 exec,
475 extension_codec,
476 );
477 }
478
479 if let Some(exec) = plan.downcast_ref::<NestedLoopJoinExec>() {
480 return protobuf::PhysicalPlanNode::try_from_nested_loop_join_exec(
481 exec,
482 extension_codec,
483 );
484 }
485
486 if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
487 return protobuf::PhysicalPlanNode::try_from_window_agg_exec(
488 exec,
489 extension_codec,
490 );
491 }
492
493 if let Some(exec) = plan.downcast_ref::<BoundedWindowAggExec>() {
494 return protobuf::PhysicalPlanNode::try_from_bounded_window_agg_exec(
495 exec,
496 extension_codec,
497 );
498 }
499
500 if let Some(exec) = plan.downcast_ref::<DataSinkExec>() {
501 if let Some(node) = protobuf::PhysicalPlanNode::try_from_data_sink_exec(
502 exec,
503 extension_codec,
504 )? {
505 return Ok(node);
506 }
507 }
508
509 if let Some(exec) = plan.downcast_ref::<UnnestExec>() {
510 return protobuf::PhysicalPlanNode::try_from_unnest_exec(
511 exec,
512 extension_codec,
513 );
514 }
515
516 let mut buf: Vec<u8> = vec![];
517 match extension_codec.try_encode(Arc::clone(&plan_clone), &mut buf) {
518 Ok(_) => {
519 let inputs: Vec<protobuf::PhysicalPlanNode> = plan_clone
520 .children()
521 .into_iter()
522 .cloned()
523 .map(|i| {
524 protobuf::PhysicalPlanNode::try_from_physical_plan(
525 i,
526 extension_codec,
527 )
528 })
529 .collect::<Result<_>>()?;
530
531 Ok(protobuf::PhysicalPlanNode {
532 physical_plan_type: Some(PhysicalPlanType::Extension(
533 protobuf::PhysicalExtensionNode { node: buf, inputs },
534 )),
535 })
536 }
537 Err(e) => internal_err!(
538 "Unsupported plan and extension codec failed with [{e}]. Plan: {plan_clone:?}"
539 ),
540 }
541 }
542}
543
544impl protobuf::PhysicalPlanNode {
545 fn try_into_explain_physical_plan(
546 &self,
547 explain: &protobuf::ExplainExecNode,
548 _registry: &dyn FunctionRegistry,
549 _runtime: &RuntimeEnv,
550 _extension_codec: &dyn PhysicalExtensionCodec,
551 ) -> Result<Arc<dyn ExecutionPlan>> {
552 Ok(Arc::new(ExplainExec::new(
553 Arc::new(explain.schema.as_ref().unwrap().try_into()?),
554 explain
555 .stringified_plans
556 .iter()
557 .map(|plan| plan.into())
558 .collect(),
559 explain.verbose,
560 )))
561 }
562
563 fn try_into_projection_physical_plan(
564 &self,
565 projection: &protobuf::ProjectionExecNode,
566 registry: &dyn FunctionRegistry,
567 runtime: &RuntimeEnv,
568 extension_codec: &dyn PhysicalExtensionCodec,
569 ) -> Result<Arc<dyn ExecutionPlan>> {
570 let input: Arc<dyn ExecutionPlan> =
571 into_physical_plan(&projection.input, registry, runtime, extension_codec)?;
572 let exprs = projection
573 .expr
574 .iter()
575 .zip(projection.expr_name.iter())
576 .map(|(expr, name)| {
577 Ok((
578 parse_physical_expr(
579 expr,
580 registry,
581 input.schema().as_ref(),
582 extension_codec,
583 )?,
584 name.to_string(),
585 ))
586 })
587 .collect::<Result<Vec<(Arc<dyn PhysicalExpr>, String)>>>()?;
588 Ok(Arc::new(ProjectionExec::try_new(exprs, input)?))
589 }
590
591 fn try_into_filter_physical_plan(
592 &self,
593 filter: &protobuf::FilterExecNode,
594 registry: &dyn FunctionRegistry,
595 runtime: &RuntimeEnv,
596 extension_codec: &dyn PhysicalExtensionCodec,
597 ) -> Result<Arc<dyn ExecutionPlan>> {
598 let input: Arc<dyn ExecutionPlan> =
599 into_physical_plan(&filter.input, registry, runtime, extension_codec)?;
600 let predicate = filter
601 .expr
602 .as_ref()
603 .map(|expr| {
604 parse_physical_expr(
605 expr,
606 registry,
607 input.schema().as_ref(),
608 extension_codec,
609 )
610 })
611 .transpose()?
612 .ok_or_else(|| {
613 DataFusionError::Internal(
614 "filter (FilterExecNode) in PhysicalPlanNode is missing.".to_owned(),
615 )
616 })?;
617 let filter_selectivity = filter.default_filter_selectivity.try_into();
618 let projection = if !filter.projection.is_empty() {
619 Some(
620 filter
621 .projection
622 .iter()
623 .map(|i| *i as usize)
624 .collect::<Vec<_>>(),
625 )
626 } else {
627 None
628 };
629 let filter =
630 FilterExec::try_new(predicate, input)?.with_projection(projection)?;
631 match filter_selectivity {
632 Ok(filter_selectivity) => Ok(Arc::new(
633 filter.with_default_selectivity(filter_selectivity)?,
634 )),
635 Err(_) => Err(DataFusionError::Internal(
636 "filter_selectivity in PhysicalPlanNode is invalid ".to_owned(),
637 )),
638 }
639 }
640
641 fn try_into_csv_scan_physical_plan(
642 &self,
643 scan: &protobuf::CsvScanExecNode,
644 registry: &dyn FunctionRegistry,
645 _runtime: &RuntimeEnv,
646 extension_codec: &dyn PhysicalExtensionCodec,
647 ) -> Result<Arc<dyn ExecutionPlan>> {
648 let escape =
649 if let Some(protobuf::csv_scan_exec_node::OptionalEscape::Escape(escape)) =
650 &scan.optional_escape
651 {
652 Some(str_to_byte(escape, "escape")?)
653 } else {
654 None
655 };
656
657 let comment = if let Some(
658 protobuf::csv_scan_exec_node::OptionalComment::Comment(comment),
659 ) = &scan.optional_comment
660 {
661 Some(str_to_byte(comment, "comment")?)
662 } else {
663 None
664 };
665
666 let source = Arc::new(
667 CsvSource::new(
668 scan.has_header,
669 str_to_byte(&scan.delimiter, "delimiter")?,
670 0,
671 )
672 .with_escape(escape)
673 .with_comment(comment),
674 );
675
676 let conf = FileScanConfigBuilder::from(parse_protobuf_file_scan_config(
677 scan.base_conf.as_ref().unwrap(),
678 registry,
679 extension_codec,
680 source,
681 )?)
682 .with_newlines_in_values(scan.newlines_in_values)
683 .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
684 .build();
685 Ok(DataSourceExec::from_data_source(conf))
686 }
687
688 fn try_into_json_scan_physical_plan(
689 &self,
690 scan: &protobuf::JsonScanExecNode,
691 registry: &dyn FunctionRegistry,
692 _runtime: &RuntimeEnv,
693 extension_codec: &dyn PhysicalExtensionCodec,
694 ) -> Result<Arc<dyn ExecutionPlan>> {
695 let scan_conf = parse_protobuf_file_scan_config(
696 scan.base_conf.as_ref().unwrap(),
697 registry,
698 extension_codec,
699 Arc::new(JsonSource::new()),
700 )?;
701 Ok(DataSourceExec::from_data_source(scan_conf))
702 }
703
704 #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
705 fn try_into_parquet_scan_physical_plan(
706 &self,
707 scan: &protobuf::ParquetScanExecNode,
708 registry: &dyn FunctionRegistry,
709 _runtime: &RuntimeEnv,
710 extension_codec: &dyn PhysicalExtensionCodec,
711 ) -> Result<Arc<dyn ExecutionPlan>> {
712 #[cfg(feature = "parquet")]
713 {
714 let schema =
715 parse_protobuf_file_scan_schema(scan.base_conf.as_ref().unwrap())?;
716 let predicate = scan
717 .predicate
718 .as_ref()
719 .map(|expr| {
720 parse_physical_expr(expr, registry, schema.as_ref(), extension_codec)
721 })
722 .transpose()?;
723 let mut options = TableParquetOptions::default();
724
725 if let Some(table_options) = scan.parquet_options.as_ref() {
726 options = table_options.try_into()?;
727 }
728 let mut source = ParquetSource::new(options);
729
730 if let Some(predicate) = predicate {
731 source = source.with_predicate(Arc::clone(&schema), predicate);
732 }
733 let base_config = parse_protobuf_file_scan_config(
734 scan.base_conf.as_ref().unwrap(),
735 registry,
736 extension_codec,
737 Arc::new(source),
738 )?;
739 Ok(DataSourceExec::from_data_source(base_config))
740 }
741 #[cfg(not(feature = "parquet"))]
742 panic!("Unable to process a Parquet PhysicalPlan when `parquet` feature is not enabled")
743 }
744
745 #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
746 fn try_into_avro_scan_physical_plan(
747 &self,
748 scan: &protobuf::AvroScanExecNode,
749 registry: &dyn FunctionRegistry,
750 _runtime: &RuntimeEnv,
751 extension_codec: &dyn PhysicalExtensionCodec,
752 ) -> Result<Arc<dyn ExecutionPlan>> {
753 #[cfg(feature = "avro")]
754 {
755 let conf = parse_protobuf_file_scan_config(
756 scan.base_conf.as_ref().unwrap(),
757 registry,
758 extension_codec,
759 Arc::new(AvroSource::new()),
760 )?;
761 Ok(DataSourceExec::from_data_source(conf))
762 }
763 #[cfg(not(feature = "avro"))]
764 panic!("Unable to process a Avro PhysicalPlan when `avro` feature is not enabled")
765 }
766
767 fn try_into_coalesce_batches_physical_plan(
768 &self,
769 coalesce_batches: &protobuf::CoalesceBatchesExecNode,
770 registry: &dyn FunctionRegistry,
771 runtime: &RuntimeEnv,
772 extension_codec: &dyn PhysicalExtensionCodec,
773 ) -> Result<Arc<dyn ExecutionPlan>> {
774 let input: Arc<dyn ExecutionPlan> = into_physical_plan(
775 &coalesce_batches.input,
776 registry,
777 runtime,
778 extension_codec,
779 )?;
780 Ok(Arc::new(
781 CoalesceBatchesExec::new(input, coalesce_batches.target_batch_size as usize)
782 .with_fetch(coalesce_batches.fetch.map(|f| f as usize)),
783 ))
784 }
785
786 fn try_into_merge_physical_plan(
787 &self,
788 merge: &protobuf::CoalescePartitionsExecNode,
789 registry: &dyn FunctionRegistry,
790 runtime: &RuntimeEnv,
791 extension_codec: &dyn PhysicalExtensionCodec,
792 ) -> Result<Arc<dyn ExecutionPlan>> {
793 let input: Arc<dyn ExecutionPlan> =
794 into_physical_plan(&merge.input, registry, runtime, extension_codec)?;
795 Ok(Arc::new(CoalescePartitionsExec::new(input)))
796 }
797
798 fn try_into_repartition_physical_plan(
799 &self,
800 repart: &protobuf::RepartitionExecNode,
801 registry: &dyn FunctionRegistry,
802 runtime: &RuntimeEnv,
803 extension_codec: &dyn PhysicalExtensionCodec,
804 ) -> Result<Arc<dyn ExecutionPlan>> {
805 let input: Arc<dyn ExecutionPlan> =
806 into_physical_plan(&repart.input, registry, runtime, extension_codec)?;
807 let partitioning = parse_protobuf_partitioning(
808 repart.partitioning.as_ref(),
809 registry,
810 input.schema().as_ref(),
811 extension_codec,
812 )?;
813 Ok(Arc::new(RepartitionExec::try_new(
814 input,
815 partitioning.unwrap(),
816 )?))
817 }
818
819 fn try_into_global_limit_physical_plan(
820 &self,
821 limit: &protobuf::GlobalLimitExecNode,
822 registry: &dyn FunctionRegistry,
823 runtime: &RuntimeEnv,
824 extension_codec: &dyn PhysicalExtensionCodec,
825 ) -> Result<Arc<dyn ExecutionPlan>> {
826 let input: Arc<dyn ExecutionPlan> =
827 into_physical_plan(&limit.input, registry, runtime, extension_codec)?;
828 let fetch = if limit.fetch >= 0 {
829 Some(limit.fetch as usize)
830 } else {
831 None
832 };
833 Ok(Arc::new(GlobalLimitExec::new(
834 input,
835 limit.skip as usize,
836 fetch,
837 )))
838 }
839
840 fn try_into_local_limit_physical_plan(
841 &self,
842 limit: &protobuf::LocalLimitExecNode,
843 registry: &dyn FunctionRegistry,
844 runtime: &RuntimeEnv,
845 extension_codec: &dyn PhysicalExtensionCodec,
846 ) -> Result<Arc<dyn ExecutionPlan>> {
847 let input: Arc<dyn ExecutionPlan> =
848 into_physical_plan(&limit.input, registry, runtime, extension_codec)?;
849 Ok(Arc::new(LocalLimitExec::new(input, limit.fetch as usize)))
850 }
851
852 fn try_into_window_physical_plan(
853 &self,
854 window_agg: &protobuf::WindowAggExecNode,
855 registry: &dyn FunctionRegistry,
856 runtime: &RuntimeEnv,
857 extension_codec: &dyn PhysicalExtensionCodec,
858 ) -> Result<Arc<dyn ExecutionPlan>> {
859 let input: Arc<dyn ExecutionPlan> =
860 into_physical_plan(&window_agg.input, registry, runtime, extension_codec)?;
861 let input_schema = input.schema();
862
863 let physical_window_expr: Vec<Arc<dyn WindowExpr>> = window_agg
864 .window_expr
865 .iter()
866 .map(|window_expr| {
867 parse_physical_window_expr(
868 window_expr,
869 registry,
870 input_schema.as_ref(),
871 extension_codec,
872 )
873 })
874 .collect::<Result<Vec<_>, _>>()?;
875
876 let partition_keys = window_agg
877 .partition_keys
878 .iter()
879 .map(|expr| {
880 parse_physical_expr(
881 expr,
882 registry,
883 input.schema().as_ref(),
884 extension_codec,
885 )
886 })
887 .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
888
889 if let Some(input_order_mode) = window_agg.input_order_mode.as_ref() {
890 let input_order_mode = match input_order_mode {
891 window_agg_exec_node::InputOrderMode::Linear(_) => InputOrderMode::Linear,
892 window_agg_exec_node::InputOrderMode::PartiallySorted(
893 protobuf::PartiallySortedInputOrderMode { columns },
894 ) => InputOrderMode::PartiallySorted(
895 columns.iter().map(|c| *c as usize).collect(),
896 ),
897 window_agg_exec_node::InputOrderMode::Sorted(_) => InputOrderMode::Sorted,
898 };
899
900 Ok(Arc::new(BoundedWindowAggExec::try_new(
901 physical_window_expr,
902 input,
903 input_order_mode,
904 !partition_keys.is_empty(),
905 )?))
906 } else {
907 Ok(Arc::new(WindowAggExec::try_new(
908 physical_window_expr,
909 input,
910 !partition_keys.is_empty(),
911 )?))
912 }
913 }
914
915 fn try_into_aggregate_physical_plan(
916 &self,
917 hash_agg: &protobuf::AggregateExecNode,
918 registry: &dyn FunctionRegistry,
919 runtime: &RuntimeEnv,
920 extension_codec: &dyn PhysicalExtensionCodec,
921 ) -> Result<Arc<dyn ExecutionPlan>> {
922 let input: Arc<dyn ExecutionPlan> =
923 into_physical_plan(&hash_agg.input, registry, runtime, extension_codec)?;
924 let mode = protobuf::AggregateMode::try_from(hash_agg.mode).map_err(|_| {
925 proto_error(format!(
926 "Received a AggregateNode message with unknown AggregateMode {}",
927 hash_agg.mode
928 ))
929 })?;
930 let agg_mode: AggregateMode = match mode {
931 protobuf::AggregateMode::Partial => AggregateMode::Partial,
932 protobuf::AggregateMode::Final => AggregateMode::Final,
933 protobuf::AggregateMode::FinalPartitioned => AggregateMode::FinalPartitioned,
934 protobuf::AggregateMode::Single => AggregateMode::Single,
935 protobuf::AggregateMode::SinglePartitioned => {
936 AggregateMode::SinglePartitioned
937 }
938 };
939
940 let num_expr = hash_agg.group_expr.len();
941
942 let group_expr = hash_agg
943 .group_expr
944 .iter()
945 .zip(hash_agg.group_expr_name.iter())
946 .map(|(expr, name)| {
947 parse_physical_expr(
948 expr,
949 registry,
950 input.schema().as_ref(),
951 extension_codec,
952 )
953 .map(|expr| (expr, name.to_string()))
954 })
955 .collect::<Result<Vec<_>, _>>()?;
956
957 let null_expr = hash_agg
958 .null_expr
959 .iter()
960 .zip(hash_agg.group_expr_name.iter())
961 .map(|(expr, name)| {
962 parse_physical_expr(
963 expr,
964 registry,
965 input.schema().as_ref(),
966 extension_codec,
967 )
968 .map(|expr| (expr, name.to_string()))
969 })
970 .collect::<Result<Vec<_>, _>>()?;
971
972 let groups: Vec<Vec<bool>> = if !hash_agg.groups.is_empty() {
973 hash_agg
974 .groups
975 .chunks(num_expr)
976 .map(|g| g.to_vec())
977 .collect::<Vec<Vec<bool>>>()
978 } else {
979 vec![]
980 };
981
982 let input_schema = hash_agg.input_schema.as_ref().ok_or_else(|| {
983 DataFusionError::Internal(
984 "input_schema in AggregateNode is missing.".to_owned(),
985 )
986 })?;
987 let physical_schema: SchemaRef = SchemaRef::new(input_schema.try_into()?);
988
989 let physical_filter_expr = hash_agg
990 .filter_expr
991 .iter()
992 .map(|expr| {
993 expr.expr
994 .as_ref()
995 .map(|e| {
996 parse_physical_expr(
997 e,
998 registry,
999 &physical_schema,
1000 extension_codec,
1001 )
1002 })
1003 .transpose()
1004 })
1005 .collect::<Result<Vec<_>, _>>()?;
1006
1007 let physical_aggr_expr: Vec<Arc<AggregateFunctionExpr>> = hash_agg
1008 .aggr_expr
1009 .iter()
1010 .zip(hash_agg.aggr_expr_name.iter())
1011 .map(|(expr, name)| {
1012 let expr_type = expr.expr_type.as_ref().ok_or_else(|| {
1013 proto_error("Unexpected empty aggregate physical expression")
1014 })?;
1015
1016 match expr_type {
1017 ExprType::AggregateExpr(agg_node) => {
1018 let input_phy_expr: Vec<Arc<dyn PhysicalExpr>> = agg_node
1019 .expr
1020 .iter()
1021 .map(|e| {
1022 parse_physical_expr(
1023 e,
1024 registry,
1025 &physical_schema,
1026 extension_codec,
1027 )
1028 })
1029 .collect::<Result<Vec<_>>>()?;
1030 let ordering_req: LexOrdering = agg_node
1031 .ordering_req
1032 .iter()
1033 .map(|e| {
1034 parse_physical_sort_expr(
1035 e,
1036 registry,
1037 &physical_schema,
1038 extension_codec,
1039 )
1040 })
1041 .collect::<Result<LexOrdering>>()?;
1042 agg_node
1043 .aggregate_function
1044 .as_ref()
1045 .map(|func| match func {
1046 AggregateFunction::UserDefinedAggrFunction(udaf_name) => {
1047 let agg_udf = match &agg_node.fun_definition {
1048 Some(buf) => extension_codec
1049 .try_decode_udaf(udaf_name, buf)?,
1050 None => registry.udaf(udaf_name)?,
1051 };
1052
1053 AggregateExprBuilder::new(agg_udf, input_phy_expr)
1054 .schema(Arc::clone(&physical_schema))
1055 .alias(name)
1056 .with_ignore_nulls(agg_node.ignore_nulls)
1057 .with_distinct(agg_node.distinct)
1058 .order_by(ordering_req)
1059 .build()
1060 .map(Arc::new)
1061 }
1062 })
1063 .transpose()?
1064 .ok_or_else(|| {
1065 proto_error(
1066 "Invalid AggregateExpr, missing aggregate_function",
1067 )
1068 })
1069 }
1070 _ => internal_err!("Invalid aggregate expression for AggregateExec"),
1071 }
1072 })
1073 .collect::<Result<Vec<_>, _>>()?;
1074
1075 let limit = hash_agg
1076 .limit
1077 .as_ref()
1078 .map(|lit_value| lit_value.limit as usize);
1079
1080 let agg = AggregateExec::try_new(
1081 agg_mode,
1082 PhysicalGroupBy::new(group_expr, null_expr, groups),
1083 physical_aggr_expr,
1084 physical_filter_expr,
1085 input,
1086 physical_schema,
1087 )?;
1088
1089 let agg = agg.with_limit(limit);
1090
1091 Ok(Arc::new(agg))
1092 }
1093
1094 fn try_into_hash_join_physical_plan(
1095 &self,
1096 hashjoin: &protobuf::HashJoinExecNode,
1097 registry: &dyn FunctionRegistry,
1098 runtime: &RuntimeEnv,
1099 extension_codec: &dyn PhysicalExtensionCodec,
1100 ) -> Result<Arc<dyn ExecutionPlan>> {
1101 let left: Arc<dyn ExecutionPlan> =
1102 into_physical_plan(&hashjoin.left, registry, runtime, extension_codec)?;
1103 let right: Arc<dyn ExecutionPlan> =
1104 into_physical_plan(&hashjoin.right, registry, runtime, extension_codec)?;
1105 let left_schema = left.schema();
1106 let right_schema = right.schema();
1107 let on: Vec<(PhysicalExprRef, PhysicalExprRef)> = hashjoin
1108 .on
1109 .iter()
1110 .map(|col| {
1111 let left = parse_physical_expr(
1112 &col.left.clone().unwrap(),
1113 registry,
1114 left_schema.as_ref(),
1115 extension_codec,
1116 )?;
1117 let right = parse_physical_expr(
1118 &col.right.clone().unwrap(),
1119 registry,
1120 right_schema.as_ref(),
1121 extension_codec,
1122 )?;
1123 Ok((left, right))
1124 })
1125 .collect::<Result<_>>()?;
1126 let join_type =
1127 protobuf::JoinType::try_from(hashjoin.join_type).map_err(|_| {
1128 proto_error(format!(
1129 "Received a HashJoinNode message with unknown JoinType {}",
1130 hashjoin.join_type
1131 ))
1132 })?;
1133 let filter = hashjoin
1134 .filter
1135 .as_ref()
1136 .map(|f| {
1137 let schema = f
1138 .schema
1139 .as_ref()
1140 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1141 .try_into()?;
1142
1143 let expression = parse_physical_expr(
1144 f.expression.as_ref().ok_or_else(|| {
1145 proto_error("Unexpected empty filter expression")
1146 })?,
1147 registry, &schema,
1148 extension_codec,
1149 )?;
1150 let column_indices = f.column_indices
1151 .iter()
1152 .map(|i| {
1153 let side = protobuf::JoinSide::try_from(i.side)
1154 .map_err(|_| proto_error(format!(
1155 "Received a HashJoinNode message with JoinSide in Filter {}",
1156 i.side))
1157 )?;
1158
1159 Ok(ColumnIndex {
1160 index: i.index as usize,
1161 side: side.into(),
1162 })
1163 })
1164 .collect::<Result<Vec<_>>>()?;
1165
1166 Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1167 })
1168 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1169
1170 let partition_mode = protobuf::PartitionMode::try_from(hashjoin.partition_mode)
1171 .map_err(|_| {
1172 proto_error(format!(
1173 "Received a HashJoinNode message with unknown PartitionMode {}",
1174 hashjoin.partition_mode
1175 ))
1176 })?;
1177 let partition_mode = match partition_mode {
1178 protobuf::PartitionMode::CollectLeft => PartitionMode::CollectLeft,
1179 protobuf::PartitionMode::Partitioned => PartitionMode::Partitioned,
1180 protobuf::PartitionMode::Auto => PartitionMode::Auto,
1181 };
1182 let projection = if !hashjoin.projection.is_empty() {
1183 Some(
1184 hashjoin
1185 .projection
1186 .iter()
1187 .map(|i| *i as usize)
1188 .collect::<Vec<_>>(),
1189 )
1190 } else {
1191 None
1192 };
1193 Ok(Arc::new(HashJoinExec::try_new(
1194 left,
1195 right,
1196 on,
1197 filter,
1198 &join_type.into(),
1199 projection,
1200 partition_mode,
1201 hashjoin.null_equals_null,
1202 )?))
1203 }
1204
1205 fn try_into_symmetric_hash_join_physical_plan(
1206 &self,
1207 sym_join: &protobuf::SymmetricHashJoinExecNode,
1208 registry: &dyn FunctionRegistry,
1209 runtime: &RuntimeEnv,
1210 extension_codec: &dyn PhysicalExtensionCodec,
1211 ) -> Result<Arc<dyn ExecutionPlan>> {
1212 let left =
1213 into_physical_plan(&sym_join.left, registry, runtime, extension_codec)?;
1214 let right =
1215 into_physical_plan(&sym_join.right, registry, runtime, extension_codec)?;
1216 let left_schema = left.schema();
1217 let right_schema = right.schema();
1218 let on = sym_join
1219 .on
1220 .iter()
1221 .map(|col| {
1222 let left = parse_physical_expr(
1223 &col.left.clone().unwrap(),
1224 registry,
1225 left_schema.as_ref(),
1226 extension_codec,
1227 )?;
1228 let right = parse_physical_expr(
1229 &col.right.clone().unwrap(),
1230 registry,
1231 right_schema.as_ref(),
1232 extension_codec,
1233 )?;
1234 Ok((left, right))
1235 })
1236 .collect::<Result<_>>()?;
1237 let join_type =
1238 protobuf::JoinType::try_from(sym_join.join_type).map_err(|_| {
1239 proto_error(format!(
1240 "Received a SymmetricHashJoin message with unknown JoinType {}",
1241 sym_join.join_type
1242 ))
1243 })?;
1244 let filter = sym_join
1245 .filter
1246 .as_ref()
1247 .map(|f| {
1248 let schema = f
1249 .schema
1250 .as_ref()
1251 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1252 .try_into()?;
1253
1254 let expression = parse_physical_expr(
1255 f.expression.as_ref().ok_or_else(|| {
1256 proto_error("Unexpected empty filter expression")
1257 })?,
1258 registry, &schema,
1259 extension_codec,
1260 )?;
1261 let column_indices = f.column_indices
1262 .iter()
1263 .map(|i| {
1264 let side = protobuf::JoinSide::try_from(i.side)
1265 .map_err(|_| proto_error(format!(
1266 "Received a HashJoinNode message with JoinSide in Filter {}",
1267 i.side))
1268 )?;
1269
1270 Ok(ColumnIndex {
1271 index: i.index as usize,
1272 side: side.into(),
1273 })
1274 })
1275 .collect::<Result<_>>()?;
1276
1277 Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1278 })
1279 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1280
1281 let left_sort_exprs = parse_physical_sort_exprs(
1282 &sym_join.left_sort_exprs,
1283 registry,
1284 &left_schema,
1285 extension_codec,
1286 )?;
1287 let left_sort_exprs = if left_sort_exprs.is_empty() {
1288 None
1289 } else {
1290 Some(left_sort_exprs)
1291 };
1292
1293 let right_sort_exprs = parse_physical_sort_exprs(
1294 &sym_join.right_sort_exprs,
1295 registry,
1296 &right_schema,
1297 extension_codec,
1298 )?;
1299 let right_sort_exprs = if right_sort_exprs.is_empty() {
1300 None
1301 } else {
1302 Some(right_sort_exprs)
1303 };
1304
1305 let partition_mode = protobuf::StreamPartitionMode::try_from(
1306 sym_join.partition_mode,
1307 )
1308 .map_err(|_| {
1309 proto_error(format!(
1310 "Received a SymmetricHashJoin message with unknown PartitionMode {}",
1311 sym_join.partition_mode
1312 ))
1313 })?;
1314 let partition_mode = match partition_mode {
1315 protobuf::StreamPartitionMode::SinglePartition => {
1316 StreamJoinPartitionMode::SinglePartition
1317 }
1318 protobuf::StreamPartitionMode::PartitionedExec => {
1319 StreamJoinPartitionMode::Partitioned
1320 }
1321 };
1322 SymmetricHashJoinExec::try_new(
1323 left,
1324 right,
1325 on,
1326 filter,
1327 &join_type.into(),
1328 sym_join.null_equals_null,
1329 left_sort_exprs,
1330 right_sort_exprs,
1331 partition_mode,
1332 )
1333 .map(|e| Arc::new(e) as _)
1334 }
1335
1336 fn try_into_union_physical_plan(
1337 &self,
1338 union: &protobuf::UnionExecNode,
1339 registry: &dyn FunctionRegistry,
1340 runtime: &RuntimeEnv,
1341 extension_codec: &dyn PhysicalExtensionCodec,
1342 ) -> Result<Arc<dyn ExecutionPlan>> {
1343 let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1344 for input in &union.inputs {
1345 inputs.push(input.try_into_physical_plan(
1346 registry,
1347 runtime,
1348 extension_codec,
1349 )?);
1350 }
1351 Ok(Arc::new(UnionExec::new(inputs)))
1352 }
1353
1354 fn try_into_interleave_physical_plan(
1355 &self,
1356 interleave: &protobuf::InterleaveExecNode,
1357 registry: &dyn FunctionRegistry,
1358 runtime: &RuntimeEnv,
1359 extension_codec: &dyn PhysicalExtensionCodec,
1360 ) -> Result<Arc<dyn ExecutionPlan>> {
1361 let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1362 for input in &interleave.inputs {
1363 inputs.push(input.try_into_physical_plan(
1364 registry,
1365 runtime,
1366 extension_codec,
1367 )?);
1368 }
1369 Ok(Arc::new(InterleaveExec::try_new(inputs)?))
1370 }
1371
1372 fn try_into_cross_join_physical_plan(
1373 &self,
1374 crossjoin: &protobuf::CrossJoinExecNode,
1375 registry: &dyn FunctionRegistry,
1376 runtime: &RuntimeEnv,
1377 extension_codec: &dyn PhysicalExtensionCodec,
1378 ) -> Result<Arc<dyn ExecutionPlan>> {
1379 let left: Arc<dyn ExecutionPlan> =
1380 into_physical_plan(&crossjoin.left, registry, runtime, extension_codec)?;
1381 let right: Arc<dyn ExecutionPlan> =
1382 into_physical_plan(&crossjoin.right, registry, runtime, extension_codec)?;
1383 Ok(Arc::new(CrossJoinExec::new(left, right)))
1384 }
1385
1386 fn try_into_empty_physical_plan(
1387 &self,
1388 empty: &protobuf::EmptyExecNode,
1389 _registry: &dyn FunctionRegistry,
1390 _runtime: &RuntimeEnv,
1391 _extension_codec: &dyn PhysicalExtensionCodec,
1392 ) -> Result<Arc<dyn ExecutionPlan>> {
1393 let schema = Arc::new(convert_required!(empty.schema)?);
1394 Ok(Arc::new(EmptyExec::new(schema)))
1395 }
1396
1397 fn try_into_placeholder_row_physical_plan(
1398 &self,
1399 placeholder: &protobuf::PlaceholderRowExecNode,
1400 _registry: &dyn FunctionRegistry,
1401 _runtime: &RuntimeEnv,
1402 _extension_codec: &dyn PhysicalExtensionCodec,
1403 ) -> Result<Arc<dyn ExecutionPlan>> {
1404 let schema = Arc::new(convert_required!(placeholder.schema)?);
1405 Ok(Arc::new(PlaceholderRowExec::new(schema)))
1406 }
1407
1408 fn try_into_sort_physical_plan(
1409 &self,
1410 sort: &protobuf::SortExecNode,
1411 registry: &dyn FunctionRegistry,
1412 runtime: &RuntimeEnv,
1413 extension_codec: &dyn PhysicalExtensionCodec,
1414 ) -> Result<Arc<dyn ExecutionPlan>> {
1415 let input: Arc<dyn ExecutionPlan> =
1416 into_physical_plan(&sort.input, registry, runtime, extension_codec)?;
1417 let exprs = sort
1418 .expr
1419 .iter()
1420 .map(|expr| {
1421 let expr = expr.expr_type.as_ref().ok_or_else(|| {
1422 proto_error(format!(
1423 "physical_plan::from_proto() Unexpected expr {self:?}"
1424 ))
1425 })?;
1426 if let ExprType::Sort(sort_expr) = expr {
1427 let expr = sort_expr
1428 .expr
1429 .as_ref()
1430 .ok_or_else(|| {
1431 proto_error(format!(
1432 "physical_plan::from_proto() Unexpected sort expr {self:?}"
1433 ))
1434 })?
1435 .as_ref();
1436 Ok(PhysicalSortExpr {
1437 expr: parse_physical_expr(expr, registry, input.schema().as_ref(), extension_codec)?,
1438 options: SortOptions {
1439 descending: !sort_expr.asc,
1440 nulls_first: sort_expr.nulls_first,
1441 },
1442 })
1443 } else {
1444 internal_err!(
1445 "physical_plan::from_proto() {self:?}"
1446 )
1447 }
1448 })
1449 .collect::<Result<LexOrdering, _>>()?;
1450 let fetch = if sort.fetch < 0 {
1451 None
1452 } else {
1453 Some(sort.fetch as usize)
1454 };
1455 let new_sort = SortExec::new(exprs, input)
1456 .with_fetch(fetch)
1457 .with_preserve_partitioning(sort.preserve_partitioning);
1458
1459 Ok(Arc::new(new_sort))
1460 }
1461
1462 fn try_into_sort_preserving_merge_physical_plan(
1463 &self,
1464 sort: &protobuf::SortPreservingMergeExecNode,
1465 registry: &dyn FunctionRegistry,
1466 runtime: &RuntimeEnv,
1467 extension_codec: &dyn PhysicalExtensionCodec,
1468 ) -> Result<Arc<dyn ExecutionPlan>> {
1469 let input: Arc<dyn ExecutionPlan> =
1470 into_physical_plan(&sort.input, registry, runtime, extension_codec)?;
1471 let exprs = sort
1472 .expr
1473 .iter()
1474 .map(|expr| {
1475 let expr = expr.expr_type.as_ref().ok_or_else(|| {
1476 proto_error(format!(
1477 "physical_plan::from_proto() Unexpected expr {self:?}"
1478 ))
1479 })?;
1480 if let ExprType::Sort(sort_expr) = expr {
1481 let expr = sort_expr
1482 .expr
1483 .as_ref()
1484 .ok_or_else(|| {
1485 proto_error(format!(
1486 "physical_plan::from_proto() Unexpected sort expr {self:?}"
1487 ))
1488 })?
1489 .as_ref();
1490 Ok(PhysicalSortExpr {
1491 expr: parse_physical_expr(
1492 expr,
1493 registry,
1494 input.schema().as_ref(),
1495 extension_codec,
1496 )?,
1497 options: SortOptions {
1498 descending: !sort_expr.asc,
1499 nulls_first: sort_expr.nulls_first,
1500 },
1501 })
1502 } else {
1503 internal_err!("physical_plan::from_proto() {self:?}")
1504 }
1505 })
1506 .collect::<Result<LexOrdering, _>>()?;
1507 let fetch = if sort.fetch < 0 {
1508 None
1509 } else {
1510 Some(sort.fetch as usize)
1511 };
1512 Ok(Arc::new(
1513 SortPreservingMergeExec::new(exprs, input).with_fetch(fetch),
1514 ))
1515 }
1516
1517 fn try_into_extension_physical_plan(
1518 &self,
1519 extension: &protobuf::PhysicalExtensionNode,
1520 registry: &dyn FunctionRegistry,
1521 runtime: &RuntimeEnv,
1522 extension_codec: &dyn PhysicalExtensionCodec,
1523 ) -> Result<Arc<dyn ExecutionPlan>> {
1524 let inputs: Vec<Arc<dyn ExecutionPlan>> = extension
1525 .inputs
1526 .iter()
1527 .map(|i| i.try_into_physical_plan(registry, runtime, extension_codec))
1528 .collect::<Result<_>>()?;
1529
1530 let extension_node =
1531 extension_codec.try_decode(extension.node.as_slice(), &inputs, registry)?;
1532
1533 Ok(extension_node)
1534 }
1535
1536 fn try_into_nested_loop_join_physical_plan(
1537 &self,
1538 join: &protobuf::NestedLoopJoinExecNode,
1539 registry: &dyn FunctionRegistry,
1540 runtime: &RuntimeEnv,
1541 extension_codec: &dyn PhysicalExtensionCodec,
1542 ) -> Result<Arc<dyn ExecutionPlan>> {
1543 let left: Arc<dyn ExecutionPlan> =
1544 into_physical_plan(&join.left, registry, runtime, extension_codec)?;
1545 let right: Arc<dyn ExecutionPlan> =
1546 into_physical_plan(&join.right, registry, runtime, extension_codec)?;
1547 let join_type = protobuf::JoinType::try_from(join.join_type).map_err(|_| {
1548 proto_error(format!(
1549 "Received a NestedLoopJoinExecNode message with unknown JoinType {}",
1550 join.join_type
1551 ))
1552 })?;
1553 let filter = join
1554 .filter
1555 .as_ref()
1556 .map(|f| {
1557 let schema = f
1558 .schema
1559 .as_ref()
1560 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1561 .try_into()?;
1562
1563 let expression = parse_physical_expr(
1564 f.expression.as_ref().ok_or_else(|| {
1565 proto_error("Unexpected empty filter expression")
1566 })?,
1567 registry, &schema,
1568 extension_codec,
1569 )?;
1570 let column_indices = f.column_indices
1571 .iter()
1572 .map(|i| {
1573 let side = protobuf::JoinSide::try_from(i.side)
1574 .map_err(|_| proto_error(format!(
1575 "Received a NestedLoopJoinExecNode message with JoinSide in Filter {}",
1576 i.side))
1577 )?;
1578
1579 Ok(ColumnIndex {
1580 index: i.index as usize,
1581 side: side.into(),
1582 })
1583 })
1584 .collect::<Result<Vec<_>>>()?;
1585
1586 Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1587 })
1588 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1589
1590 let projection = if !join.projection.is_empty() {
1591 Some(
1592 join.projection
1593 .iter()
1594 .map(|i| *i as usize)
1595 .collect::<Vec<_>>(),
1596 )
1597 } else {
1598 None
1599 };
1600
1601 Ok(Arc::new(NestedLoopJoinExec::try_new(
1602 left,
1603 right,
1604 filter,
1605 &join_type.into(),
1606 projection,
1607 )?))
1608 }
1609
1610 fn try_into_analyze_physical_plan(
1611 &self,
1612 analyze: &protobuf::AnalyzeExecNode,
1613 registry: &dyn FunctionRegistry,
1614 runtime: &RuntimeEnv,
1615 extension_codec: &dyn PhysicalExtensionCodec,
1616 ) -> Result<Arc<dyn ExecutionPlan>> {
1617 let input: Arc<dyn ExecutionPlan> =
1618 into_physical_plan(&analyze.input, registry, runtime, extension_codec)?;
1619 Ok(Arc::new(AnalyzeExec::new(
1620 analyze.verbose,
1621 analyze.show_statistics,
1622 input,
1623 Arc::new(convert_required!(analyze.schema)?),
1624 )))
1625 }
1626
1627 fn try_into_json_sink_physical_plan(
1628 &self,
1629 sink: &protobuf::JsonSinkExecNode,
1630 registry: &dyn FunctionRegistry,
1631 runtime: &RuntimeEnv,
1632 extension_codec: &dyn PhysicalExtensionCodec,
1633 ) -> Result<Arc<dyn ExecutionPlan>> {
1634 let input = into_physical_plan(&sink.input, registry, runtime, extension_codec)?;
1635
1636 let data_sink: JsonSink = sink
1637 .sink
1638 .as_ref()
1639 .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1640 .try_into()?;
1641 let sink_schema = input.schema();
1642 let sort_order = sink
1643 .sort_order
1644 .as_ref()
1645 .map(|collection| {
1646 parse_physical_sort_exprs(
1647 &collection.physical_sort_expr_nodes,
1648 registry,
1649 &sink_schema,
1650 extension_codec,
1651 )
1652 .map(LexRequirement::from)
1653 })
1654 .transpose()?;
1655 Ok(Arc::new(DataSinkExec::new(
1656 input,
1657 Arc::new(data_sink),
1658 sort_order,
1659 )))
1660 }
1661
1662 fn try_into_csv_sink_physical_plan(
1663 &self,
1664 sink: &protobuf::CsvSinkExecNode,
1665 registry: &dyn FunctionRegistry,
1666 runtime: &RuntimeEnv,
1667 extension_codec: &dyn PhysicalExtensionCodec,
1668 ) -> Result<Arc<dyn ExecutionPlan>> {
1669 let input = into_physical_plan(&sink.input, registry, runtime, extension_codec)?;
1670
1671 let data_sink: CsvSink = sink
1672 .sink
1673 .as_ref()
1674 .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1675 .try_into()?;
1676 let sink_schema = input.schema();
1677 let sort_order = sink
1678 .sort_order
1679 .as_ref()
1680 .map(|collection| {
1681 parse_physical_sort_exprs(
1682 &collection.physical_sort_expr_nodes,
1683 registry,
1684 &sink_schema,
1685 extension_codec,
1686 )
1687 .map(LexRequirement::from)
1688 })
1689 .transpose()?;
1690 Ok(Arc::new(DataSinkExec::new(
1691 input,
1692 Arc::new(data_sink),
1693 sort_order,
1694 )))
1695 }
1696
1697 fn try_into_parquet_sink_physical_plan(
1698 &self,
1699 sink: &protobuf::ParquetSinkExecNode,
1700 registry: &dyn FunctionRegistry,
1701 runtime: &RuntimeEnv,
1702 extension_codec: &dyn PhysicalExtensionCodec,
1703 ) -> Result<Arc<dyn ExecutionPlan>> {
1704 #[cfg(feature = "parquet")]
1705 {
1706 let input =
1707 into_physical_plan(&sink.input, registry, runtime, extension_codec)?;
1708
1709 let data_sink: ParquetSink = sink
1710 .sink
1711 .as_ref()
1712 .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1713 .try_into()?;
1714 let sink_schema = input.schema();
1715 let sort_order = sink
1716 .sort_order
1717 .as_ref()
1718 .map(|collection| {
1719 parse_physical_sort_exprs(
1720 &collection.physical_sort_expr_nodes,
1721 registry,
1722 &sink_schema,
1723 extension_codec,
1724 )
1725 .map(LexRequirement::from)
1726 })
1727 .transpose()?;
1728 Ok(Arc::new(DataSinkExec::new(
1729 input,
1730 Arc::new(data_sink),
1731 sort_order,
1732 )))
1733 }
1734 #[cfg(not(feature = "parquet"))]
1735 panic!("Trying to use ParquetSink without `parquet` feature enabled");
1736 }
1737
1738 fn try_into_unnest_physical_plan(
1739 &self,
1740 unnest: &protobuf::UnnestExecNode,
1741 registry: &dyn FunctionRegistry,
1742 runtime: &RuntimeEnv,
1743 extension_codec: &dyn PhysicalExtensionCodec,
1744 ) -> Result<Arc<dyn ExecutionPlan>> {
1745 let input =
1746 into_physical_plan(&unnest.input, registry, runtime, extension_codec)?;
1747
1748 Ok(Arc::new(UnnestExec::new(
1749 input,
1750 unnest
1751 .list_type_columns
1752 .iter()
1753 .map(|c| ListUnnest {
1754 index_in_input_schema: c.index_in_input_schema as _,
1755 depth: c.depth as _,
1756 })
1757 .collect(),
1758 unnest.struct_type_columns.iter().map(|c| *c as _).collect(),
1759 Arc::new(convert_required!(unnest.schema)?),
1760 into_required!(unnest.options)?,
1761 )))
1762 }
1763
1764 fn try_from_explain_exec(
1765 exec: &ExplainExec,
1766 _extension_codec: &dyn PhysicalExtensionCodec,
1767 ) -> Result<Self> {
1768 Ok(protobuf::PhysicalPlanNode {
1769 physical_plan_type: Some(PhysicalPlanType::Explain(
1770 protobuf::ExplainExecNode {
1771 schema: Some(exec.schema().as_ref().try_into()?),
1772 stringified_plans: exec
1773 .stringified_plans()
1774 .iter()
1775 .map(|plan| plan.into())
1776 .collect(),
1777 verbose: exec.verbose(),
1778 },
1779 )),
1780 })
1781 }
1782
1783 fn try_from_projection_exec(
1784 exec: &ProjectionExec,
1785 extension_codec: &dyn PhysicalExtensionCodec,
1786 ) -> Result<Self> {
1787 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1788 exec.input().to_owned(),
1789 extension_codec,
1790 )?;
1791 let expr = exec
1792 .expr()
1793 .iter()
1794 .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
1795 .collect::<Result<Vec<_>>>()?;
1796 let expr_name = exec.expr().iter().map(|expr| expr.1.clone()).collect();
1797 Ok(protobuf::PhysicalPlanNode {
1798 physical_plan_type: Some(PhysicalPlanType::Projection(Box::new(
1799 protobuf::ProjectionExecNode {
1800 input: Some(Box::new(input)),
1801 expr,
1802 expr_name,
1803 },
1804 ))),
1805 })
1806 }
1807
1808 fn try_from_analyze_exec(
1809 exec: &AnalyzeExec,
1810 extension_codec: &dyn PhysicalExtensionCodec,
1811 ) -> Result<Self> {
1812 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1813 exec.input().to_owned(),
1814 extension_codec,
1815 )?;
1816 Ok(protobuf::PhysicalPlanNode {
1817 physical_plan_type: Some(PhysicalPlanType::Analyze(Box::new(
1818 protobuf::AnalyzeExecNode {
1819 verbose: exec.verbose(),
1820 show_statistics: exec.show_statistics(),
1821 input: Some(Box::new(input)),
1822 schema: Some(exec.schema().as_ref().try_into()?),
1823 },
1824 ))),
1825 })
1826 }
1827
1828 fn try_from_filter_exec(
1829 exec: &FilterExec,
1830 extension_codec: &dyn PhysicalExtensionCodec,
1831 ) -> Result<Self> {
1832 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1833 exec.input().to_owned(),
1834 extension_codec,
1835 )?;
1836 Ok(protobuf::PhysicalPlanNode {
1837 physical_plan_type: Some(PhysicalPlanType::Filter(Box::new(
1838 protobuf::FilterExecNode {
1839 input: Some(Box::new(input)),
1840 expr: Some(serialize_physical_expr(
1841 exec.predicate(),
1842 extension_codec,
1843 )?),
1844 default_filter_selectivity: exec.default_selectivity() as u32,
1845 projection: exec.projection().as_ref().map_or_else(Vec::new, |v| {
1846 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
1847 }),
1848 },
1849 ))),
1850 })
1851 }
1852
1853 fn try_from_global_limit_exec(
1854 limit: &GlobalLimitExec,
1855 extension_codec: &dyn PhysicalExtensionCodec,
1856 ) -> Result<Self> {
1857 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1858 limit.input().to_owned(),
1859 extension_codec,
1860 )?;
1861
1862 Ok(protobuf::PhysicalPlanNode {
1863 physical_plan_type: Some(PhysicalPlanType::GlobalLimit(Box::new(
1864 protobuf::GlobalLimitExecNode {
1865 input: Some(Box::new(input)),
1866 skip: limit.skip() as u32,
1867 fetch: match limit.fetch() {
1868 Some(n) => n as i64,
1869 _ => -1, },
1871 },
1872 ))),
1873 })
1874 }
1875
1876 fn try_from_local_limit_exec(
1877 limit: &LocalLimitExec,
1878 extension_codec: &dyn PhysicalExtensionCodec,
1879 ) -> Result<Self> {
1880 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1881 limit.input().to_owned(),
1882 extension_codec,
1883 )?;
1884 Ok(protobuf::PhysicalPlanNode {
1885 physical_plan_type: Some(PhysicalPlanType::LocalLimit(Box::new(
1886 protobuf::LocalLimitExecNode {
1887 input: Some(Box::new(input)),
1888 fetch: limit.fetch() as u32,
1889 },
1890 ))),
1891 })
1892 }
1893
1894 fn try_from_hash_join_exec(
1895 exec: &HashJoinExec,
1896 extension_codec: &dyn PhysicalExtensionCodec,
1897 ) -> Result<Self> {
1898 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
1899 exec.left().to_owned(),
1900 extension_codec,
1901 )?;
1902 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
1903 exec.right().to_owned(),
1904 extension_codec,
1905 )?;
1906 let on: Vec<protobuf::JoinOn> = exec
1907 .on()
1908 .iter()
1909 .map(|tuple| {
1910 let l = serialize_physical_expr(&tuple.0, extension_codec)?;
1911 let r = serialize_physical_expr(&tuple.1, extension_codec)?;
1912 Ok::<_, DataFusionError>(protobuf::JoinOn {
1913 left: Some(l),
1914 right: Some(r),
1915 })
1916 })
1917 .collect::<Result<_>>()?;
1918 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
1919 let filter = exec
1920 .filter()
1921 .as_ref()
1922 .map(|f| {
1923 let expression =
1924 serialize_physical_expr(f.expression(), extension_codec)?;
1925 let column_indices = f
1926 .column_indices()
1927 .iter()
1928 .map(|i| {
1929 let side: protobuf::JoinSide = i.side.to_owned().into();
1930 protobuf::ColumnIndex {
1931 index: i.index as u32,
1932 side: side.into(),
1933 }
1934 })
1935 .collect();
1936 let schema = f.schema().as_ref().try_into()?;
1937 Ok(protobuf::JoinFilter {
1938 expression: Some(expression),
1939 column_indices,
1940 schema: Some(schema),
1941 })
1942 })
1943 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
1944
1945 let partition_mode = match exec.partition_mode() {
1946 PartitionMode::CollectLeft => protobuf::PartitionMode::CollectLeft,
1947 PartitionMode::Partitioned => protobuf::PartitionMode::Partitioned,
1948 PartitionMode::Auto => protobuf::PartitionMode::Auto,
1949 };
1950
1951 Ok(protobuf::PhysicalPlanNode {
1952 physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new(
1953 protobuf::HashJoinExecNode {
1954 left: Some(Box::new(left)),
1955 right: Some(Box::new(right)),
1956 on,
1957 join_type: join_type.into(),
1958 partition_mode: partition_mode.into(),
1959 null_equals_null: exec.null_equals_null(),
1960 filter,
1961 projection: exec.projection.as_ref().map_or_else(Vec::new, |v| {
1962 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
1963 }),
1964 },
1965 ))),
1966 })
1967 }
1968
1969 fn try_from_symmetric_hash_join_exec(
1970 exec: &SymmetricHashJoinExec,
1971 extension_codec: &dyn PhysicalExtensionCodec,
1972 ) -> Result<Self> {
1973 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
1974 exec.left().to_owned(),
1975 extension_codec,
1976 )?;
1977 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
1978 exec.right().to_owned(),
1979 extension_codec,
1980 )?;
1981 let on = exec
1982 .on()
1983 .iter()
1984 .map(|tuple| {
1985 let l = serialize_physical_expr(&tuple.0, extension_codec)?;
1986 let r = serialize_physical_expr(&tuple.1, extension_codec)?;
1987 Ok::<_, DataFusionError>(protobuf::JoinOn {
1988 left: Some(l),
1989 right: Some(r),
1990 })
1991 })
1992 .collect::<Result<_>>()?;
1993 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
1994 let filter = exec
1995 .filter()
1996 .as_ref()
1997 .map(|f| {
1998 let expression =
1999 serialize_physical_expr(f.expression(), extension_codec)?;
2000 let column_indices = f
2001 .column_indices()
2002 .iter()
2003 .map(|i| {
2004 let side: protobuf::JoinSide = i.side.to_owned().into();
2005 protobuf::ColumnIndex {
2006 index: i.index as u32,
2007 side: side.into(),
2008 }
2009 })
2010 .collect();
2011 let schema = f.schema().as_ref().try_into()?;
2012 Ok(protobuf::JoinFilter {
2013 expression: Some(expression),
2014 column_indices,
2015 schema: Some(schema),
2016 })
2017 })
2018 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2019
2020 let partition_mode = match exec.partition_mode() {
2021 StreamJoinPartitionMode::SinglePartition => {
2022 protobuf::StreamPartitionMode::SinglePartition
2023 }
2024 StreamJoinPartitionMode::Partitioned => {
2025 protobuf::StreamPartitionMode::PartitionedExec
2026 }
2027 };
2028
2029 let left_sort_exprs = exec
2030 .left_sort_exprs()
2031 .map(|exprs| {
2032 exprs
2033 .iter()
2034 .map(|expr| {
2035 Ok(protobuf::PhysicalSortExprNode {
2036 expr: Some(Box::new(serialize_physical_expr(
2037 &expr.expr,
2038 extension_codec,
2039 )?)),
2040 asc: !expr.options.descending,
2041 nulls_first: expr.options.nulls_first,
2042 })
2043 })
2044 .collect::<Result<Vec<_>>>()
2045 })
2046 .transpose()?
2047 .unwrap_or(vec![]);
2048
2049 let right_sort_exprs = exec
2050 .right_sort_exprs()
2051 .map(|exprs| {
2052 exprs
2053 .iter()
2054 .map(|expr| {
2055 Ok(protobuf::PhysicalSortExprNode {
2056 expr: Some(Box::new(serialize_physical_expr(
2057 &expr.expr,
2058 extension_codec,
2059 )?)),
2060 asc: !expr.options.descending,
2061 nulls_first: expr.options.nulls_first,
2062 })
2063 })
2064 .collect::<Result<Vec<_>>>()
2065 })
2066 .transpose()?
2067 .unwrap_or(vec![]);
2068
2069 Ok(protobuf::PhysicalPlanNode {
2070 physical_plan_type: Some(PhysicalPlanType::SymmetricHashJoin(Box::new(
2071 protobuf::SymmetricHashJoinExecNode {
2072 left: Some(Box::new(left)),
2073 right: Some(Box::new(right)),
2074 on,
2075 join_type: join_type.into(),
2076 partition_mode: partition_mode.into(),
2077 null_equals_null: exec.null_equals_null(),
2078 left_sort_exprs,
2079 right_sort_exprs,
2080 filter,
2081 },
2082 ))),
2083 })
2084 }
2085
2086 fn try_from_cross_join_exec(
2087 exec: &CrossJoinExec,
2088 extension_codec: &dyn PhysicalExtensionCodec,
2089 ) -> Result<Self> {
2090 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2091 exec.left().to_owned(),
2092 extension_codec,
2093 )?;
2094 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2095 exec.right().to_owned(),
2096 extension_codec,
2097 )?;
2098 Ok(protobuf::PhysicalPlanNode {
2099 physical_plan_type: Some(PhysicalPlanType::CrossJoin(Box::new(
2100 protobuf::CrossJoinExecNode {
2101 left: Some(Box::new(left)),
2102 right: Some(Box::new(right)),
2103 },
2104 ))),
2105 })
2106 }
2107
2108 fn try_from_aggregate_exec(
2109 exec: &AggregateExec,
2110 extension_codec: &dyn PhysicalExtensionCodec,
2111 ) -> Result<Self> {
2112 let groups: Vec<bool> = exec
2113 .group_expr()
2114 .groups()
2115 .iter()
2116 .flatten()
2117 .copied()
2118 .collect();
2119
2120 let group_names = exec
2121 .group_expr()
2122 .expr()
2123 .iter()
2124 .map(|expr| expr.1.to_owned())
2125 .collect();
2126
2127 let filter = exec
2128 .filter_expr()
2129 .iter()
2130 .map(|expr| serialize_maybe_filter(expr.to_owned(), extension_codec))
2131 .collect::<Result<Vec<_>>>()?;
2132
2133 let agg = exec
2134 .aggr_expr()
2135 .iter()
2136 .map(|expr| serialize_physical_aggr_expr(expr.to_owned(), extension_codec))
2137 .collect::<Result<Vec<_>>>()?;
2138
2139 let agg_names = exec
2140 .aggr_expr()
2141 .iter()
2142 .map(|expr| expr.name().to_string())
2143 .collect::<Vec<_>>();
2144
2145 let agg_mode = match exec.mode() {
2146 AggregateMode::Partial => protobuf::AggregateMode::Partial,
2147 AggregateMode::Final => protobuf::AggregateMode::Final,
2148 AggregateMode::FinalPartitioned => protobuf::AggregateMode::FinalPartitioned,
2149 AggregateMode::Single => protobuf::AggregateMode::Single,
2150 AggregateMode::SinglePartitioned => {
2151 protobuf::AggregateMode::SinglePartitioned
2152 }
2153 };
2154 let input_schema = exec.input_schema();
2155 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2156 exec.input().to_owned(),
2157 extension_codec,
2158 )?;
2159
2160 let null_expr = exec
2161 .group_expr()
2162 .null_expr()
2163 .iter()
2164 .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
2165 .collect::<Result<Vec<_>>>()?;
2166
2167 let group_expr = exec
2168 .group_expr()
2169 .expr()
2170 .iter()
2171 .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
2172 .collect::<Result<Vec<_>>>()?;
2173
2174 let limit = exec.limit().map(|value| protobuf::AggLimit {
2175 limit: value as u64,
2176 });
2177
2178 Ok(protobuf::PhysicalPlanNode {
2179 physical_plan_type: Some(PhysicalPlanType::Aggregate(Box::new(
2180 protobuf::AggregateExecNode {
2181 group_expr,
2182 group_expr_name: group_names,
2183 aggr_expr: agg,
2184 filter_expr: filter,
2185 aggr_expr_name: agg_names,
2186 mode: agg_mode as i32,
2187 input: Some(Box::new(input)),
2188 input_schema: Some(input_schema.as_ref().try_into()?),
2189 null_expr,
2190 groups,
2191 limit,
2192 },
2193 ))),
2194 })
2195 }
2196
2197 fn try_from_empty_exec(
2198 empty: &EmptyExec,
2199 _extension_codec: &dyn PhysicalExtensionCodec,
2200 ) -> Result<Self> {
2201 let schema = empty.schema().as_ref().try_into()?;
2202 Ok(protobuf::PhysicalPlanNode {
2203 physical_plan_type: Some(PhysicalPlanType::Empty(protobuf::EmptyExecNode {
2204 schema: Some(schema),
2205 })),
2206 })
2207 }
2208
2209 fn try_from_placeholder_row_exec(
2210 empty: &PlaceholderRowExec,
2211 _extension_codec: &dyn PhysicalExtensionCodec,
2212 ) -> Result<Self> {
2213 let schema = empty.schema().as_ref().try_into()?;
2214 Ok(protobuf::PhysicalPlanNode {
2215 physical_plan_type: Some(PhysicalPlanType::PlaceholderRow(
2216 protobuf::PlaceholderRowExecNode {
2217 schema: Some(schema),
2218 },
2219 )),
2220 })
2221 }
2222
2223 fn try_from_coalesce_batches_exec(
2224 coalesce_batches: &CoalesceBatchesExec,
2225 extension_codec: &dyn PhysicalExtensionCodec,
2226 ) -> Result<Self> {
2227 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2228 coalesce_batches.input().to_owned(),
2229 extension_codec,
2230 )?;
2231 Ok(protobuf::PhysicalPlanNode {
2232 physical_plan_type: Some(PhysicalPlanType::CoalesceBatches(Box::new(
2233 protobuf::CoalesceBatchesExecNode {
2234 input: Some(Box::new(input)),
2235 target_batch_size: coalesce_batches.target_batch_size() as u32,
2236 fetch: coalesce_batches.fetch().map(|n| n as u32),
2237 },
2238 ))),
2239 })
2240 }
2241
2242 fn try_from_data_source_exec(
2243 data_source_exec: &DataSourceExec,
2244 extension_codec: &dyn PhysicalExtensionCodec,
2245 ) -> Result<Option<Self>> {
2246 let data_source = data_source_exec.data_source();
2247 if let Some(maybe_csv) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2248 let source = maybe_csv.file_source();
2249 if let Some(csv_config) = source.as_any().downcast_ref::<CsvSource>() {
2250 return Ok(Some(protobuf::PhysicalPlanNode {
2251 physical_plan_type: Some(PhysicalPlanType::CsvScan(
2252 protobuf::CsvScanExecNode {
2253 base_conf: Some(serialize_file_scan_config(
2254 maybe_csv,
2255 extension_codec,
2256 )?),
2257 has_header: csv_config.has_header(),
2258 delimiter: byte_to_string(
2259 csv_config.delimiter(),
2260 "delimiter",
2261 )?,
2262 quote: byte_to_string(csv_config.quote(), "quote")?,
2263 optional_escape: if let Some(escape) = csv_config.escape() {
2264 Some(
2265 protobuf::csv_scan_exec_node::OptionalEscape::Escape(
2266 byte_to_string(escape, "escape")?,
2267 ),
2268 )
2269 } else {
2270 None
2271 },
2272 optional_comment: if let Some(comment) = csv_config.comment()
2273 {
2274 Some(protobuf::csv_scan_exec_node::OptionalComment::Comment(
2275 byte_to_string(comment, "comment")?,
2276 ))
2277 } else {
2278 None
2279 },
2280 newlines_in_values: maybe_csv.newlines_in_values(),
2281 },
2282 )),
2283 }));
2284 }
2285 }
2286
2287 if let Some(scan_conf) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2288 let source = scan_conf.file_source();
2289 if let Some(_json_source) = source.as_any().downcast_ref::<JsonSource>() {
2290 return Ok(Some(protobuf::PhysicalPlanNode {
2291 physical_plan_type: Some(PhysicalPlanType::JsonScan(
2292 protobuf::JsonScanExecNode {
2293 base_conf: Some(serialize_file_scan_config(
2294 scan_conf,
2295 extension_codec,
2296 )?),
2297 },
2298 )),
2299 }));
2300 }
2301 }
2302
2303 #[cfg(feature = "parquet")]
2304 if let Some((maybe_parquet, conf)) =
2305 data_source_exec.downcast_to_file_source::<ParquetSource>()
2306 {
2307 let predicate = conf
2308 .predicate()
2309 .map(|pred| serialize_physical_expr(pred, extension_codec))
2310 .transpose()?;
2311 return Ok(Some(protobuf::PhysicalPlanNode {
2312 physical_plan_type: Some(PhysicalPlanType::ParquetScan(
2313 protobuf::ParquetScanExecNode {
2314 base_conf: Some(serialize_file_scan_config(
2315 maybe_parquet,
2316 extension_codec,
2317 )?),
2318 predicate,
2319 parquet_options: Some(conf.table_parquet_options().try_into()?),
2320 },
2321 )),
2322 }));
2323 }
2324
2325 #[cfg(feature = "avro")]
2326 if let Some(maybe_avro) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2327 let source = maybe_avro.file_source();
2328 if source.as_any().downcast_ref::<AvroSource>().is_some() {
2329 return Ok(Some(protobuf::PhysicalPlanNode {
2330 physical_plan_type: Some(PhysicalPlanType::AvroScan(
2331 protobuf::AvroScanExecNode {
2332 base_conf: Some(serialize_file_scan_config(
2333 maybe_avro,
2334 extension_codec,
2335 )?),
2336 },
2337 )),
2338 }));
2339 }
2340 }
2341
2342 Ok(None)
2343 }
2344
2345 fn try_from_coalesce_partitions_exec(
2346 exec: &CoalescePartitionsExec,
2347 extension_codec: &dyn PhysicalExtensionCodec,
2348 ) -> Result<Self> {
2349 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2350 exec.input().to_owned(),
2351 extension_codec,
2352 )?;
2353 Ok(protobuf::PhysicalPlanNode {
2354 physical_plan_type: Some(PhysicalPlanType::Merge(Box::new(
2355 protobuf::CoalescePartitionsExecNode {
2356 input: Some(Box::new(input)),
2357 },
2358 ))),
2359 })
2360 }
2361
2362 fn try_from_repartition_exec(
2363 exec: &RepartitionExec,
2364 extension_codec: &dyn PhysicalExtensionCodec,
2365 ) -> Result<Self> {
2366 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2367 exec.input().to_owned(),
2368 extension_codec,
2369 )?;
2370
2371 let pb_partitioning =
2372 serialize_partitioning(exec.partitioning(), extension_codec)?;
2373
2374 Ok(protobuf::PhysicalPlanNode {
2375 physical_plan_type: Some(PhysicalPlanType::Repartition(Box::new(
2376 protobuf::RepartitionExecNode {
2377 input: Some(Box::new(input)),
2378 partitioning: Some(pb_partitioning),
2379 },
2380 ))),
2381 })
2382 }
2383
2384 fn try_from_sort_exec(
2385 exec: &SortExec,
2386 extension_codec: &dyn PhysicalExtensionCodec,
2387 ) -> Result<Self> {
2388 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2389 exec.input().to_owned(),
2390 extension_codec,
2391 )?;
2392 let expr = exec
2393 .expr()
2394 .iter()
2395 .map(|expr| {
2396 let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
2397 expr: Some(Box::new(serialize_physical_expr(
2398 &expr.expr,
2399 extension_codec,
2400 )?)),
2401 asc: !expr.options.descending,
2402 nulls_first: expr.options.nulls_first,
2403 });
2404 Ok(protobuf::PhysicalExprNode {
2405 expr_type: Some(ExprType::Sort(sort_expr)),
2406 })
2407 })
2408 .collect::<Result<Vec<_>>>()?;
2409 Ok(protobuf::PhysicalPlanNode {
2410 physical_plan_type: Some(PhysicalPlanType::Sort(Box::new(
2411 protobuf::SortExecNode {
2412 input: Some(Box::new(input)),
2413 expr,
2414 fetch: match exec.fetch() {
2415 Some(n) => n as i64,
2416 _ => -1,
2417 },
2418 preserve_partitioning: exec.preserve_partitioning(),
2419 },
2420 ))),
2421 })
2422 }
2423
2424 fn try_from_union_exec(
2425 union: &UnionExec,
2426 extension_codec: &dyn PhysicalExtensionCodec,
2427 ) -> Result<Self> {
2428 let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
2429 for input in union.inputs() {
2430 inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
2431 input.to_owned(),
2432 extension_codec,
2433 )?);
2434 }
2435 Ok(protobuf::PhysicalPlanNode {
2436 physical_plan_type: Some(PhysicalPlanType::Union(protobuf::UnionExecNode {
2437 inputs,
2438 })),
2439 })
2440 }
2441
2442 fn try_from_interleave_exec(
2443 interleave: &InterleaveExec,
2444 extension_codec: &dyn PhysicalExtensionCodec,
2445 ) -> Result<Self> {
2446 let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
2447 for input in interleave.inputs() {
2448 inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
2449 input.to_owned(),
2450 extension_codec,
2451 )?);
2452 }
2453 Ok(protobuf::PhysicalPlanNode {
2454 physical_plan_type: Some(PhysicalPlanType::Interleave(
2455 protobuf::InterleaveExecNode { inputs },
2456 )),
2457 })
2458 }
2459
2460 fn try_from_sort_preserving_merge_exec(
2461 exec: &SortPreservingMergeExec,
2462 extension_codec: &dyn PhysicalExtensionCodec,
2463 ) -> Result<Self> {
2464 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2465 exec.input().to_owned(),
2466 extension_codec,
2467 )?;
2468 let expr = exec
2469 .expr()
2470 .iter()
2471 .map(|expr| {
2472 let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
2473 expr: Some(Box::new(serialize_physical_expr(
2474 &expr.expr,
2475 extension_codec,
2476 )?)),
2477 asc: !expr.options.descending,
2478 nulls_first: expr.options.nulls_first,
2479 });
2480 Ok(protobuf::PhysicalExprNode {
2481 expr_type: Some(ExprType::Sort(sort_expr)),
2482 })
2483 })
2484 .collect::<Result<Vec<_>>>()?;
2485 Ok(protobuf::PhysicalPlanNode {
2486 physical_plan_type: Some(PhysicalPlanType::SortPreservingMerge(Box::new(
2487 protobuf::SortPreservingMergeExecNode {
2488 input: Some(Box::new(input)),
2489 expr,
2490 fetch: exec.fetch().map(|f| f as i64).unwrap_or(-1),
2491 },
2492 ))),
2493 })
2494 }
2495
2496 fn try_from_nested_loop_join_exec(
2497 exec: &NestedLoopJoinExec,
2498 extension_codec: &dyn PhysicalExtensionCodec,
2499 ) -> Result<Self> {
2500 let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2501 exec.left().to_owned(),
2502 extension_codec,
2503 )?;
2504 let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2505 exec.right().to_owned(),
2506 extension_codec,
2507 )?;
2508
2509 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2510 let filter = exec
2511 .filter()
2512 .as_ref()
2513 .map(|f| {
2514 let expression =
2515 serialize_physical_expr(f.expression(), extension_codec)?;
2516 let column_indices = f
2517 .column_indices()
2518 .iter()
2519 .map(|i| {
2520 let side: protobuf::JoinSide = i.side.to_owned().into();
2521 protobuf::ColumnIndex {
2522 index: i.index as u32,
2523 side: side.into(),
2524 }
2525 })
2526 .collect();
2527 let schema = f.schema().as_ref().try_into()?;
2528 Ok(protobuf::JoinFilter {
2529 expression: Some(expression),
2530 column_indices,
2531 schema: Some(schema),
2532 })
2533 })
2534 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2535
2536 Ok(protobuf::PhysicalPlanNode {
2537 physical_plan_type: Some(PhysicalPlanType::NestedLoopJoin(Box::new(
2538 protobuf::NestedLoopJoinExecNode {
2539 left: Some(Box::new(left)),
2540 right: Some(Box::new(right)),
2541 join_type: join_type.into(),
2542 filter,
2543 projection: exec.projection().map_or_else(Vec::new, |v| {
2544 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2545 }),
2546 },
2547 ))),
2548 })
2549 }
2550
2551 fn try_from_window_agg_exec(
2552 exec: &WindowAggExec,
2553 extension_codec: &dyn PhysicalExtensionCodec,
2554 ) -> Result<Self> {
2555 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2556 exec.input().to_owned(),
2557 extension_codec,
2558 )?;
2559
2560 let window_expr = exec
2561 .window_expr()
2562 .iter()
2563 .map(|e| serialize_physical_window_expr(e, extension_codec))
2564 .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
2565
2566 let partition_keys = exec
2567 .partition_keys()
2568 .iter()
2569 .map(|e| serialize_physical_expr(e, extension_codec))
2570 .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
2571
2572 Ok(protobuf::PhysicalPlanNode {
2573 physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
2574 protobuf::WindowAggExecNode {
2575 input: Some(Box::new(input)),
2576 window_expr,
2577 partition_keys,
2578 input_order_mode: None,
2579 },
2580 ))),
2581 })
2582 }
2583
2584 fn try_from_bounded_window_agg_exec(
2585 exec: &BoundedWindowAggExec,
2586 extension_codec: &dyn PhysicalExtensionCodec,
2587 ) -> Result<Self> {
2588 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2589 exec.input().to_owned(),
2590 extension_codec,
2591 )?;
2592
2593 let window_expr = exec
2594 .window_expr()
2595 .iter()
2596 .map(|e| serialize_physical_window_expr(e, extension_codec))
2597 .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
2598
2599 let partition_keys = exec
2600 .partition_keys()
2601 .iter()
2602 .map(|e| serialize_physical_expr(e, extension_codec))
2603 .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
2604
2605 let input_order_mode = match &exec.input_order_mode {
2606 InputOrderMode::Linear => {
2607 window_agg_exec_node::InputOrderMode::Linear(protobuf::EmptyMessage {})
2608 }
2609 InputOrderMode::PartiallySorted(columns) => {
2610 window_agg_exec_node::InputOrderMode::PartiallySorted(
2611 protobuf::PartiallySortedInputOrderMode {
2612 columns: columns.iter().map(|c| *c as u64).collect(),
2613 },
2614 )
2615 }
2616 InputOrderMode::Sorted => {
2617 window_agg_exec_node::InputOrderMode::Sorted(protobuf::EmptyMessage {})
2618 }
2619 };
2620
2621 Ok(protobuf::PhysicalPlanNode {
2622 physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
2623 protobuf::WindowAggExecNode {
2624 input: Some(Box::new(input)),
2625 window_expr,
2626 partition_keys,
2627 input_order_mode: Some(input_order_mode),
2628 },
2629 ))),
2630 })
2631 }
2632
2633 fn try_from_data_sink_exec(
2634 exec: &DataSinkExec,
2635 extension_codec: &dyn PhysicalExtensionCodec,
2636 ) -> Result<Option<Self>> {
2637 let input: protobuf::PhysicalPlanNode =
2638 protobuf::PhysicalPlanNode::try_from_physical_plan(
2639 exec.input().to_owned(),
2640 extension_codec,
2641 )?;
2642 let sort_order = match exec.sort_order() {
2643 Some(requirements) => {
2644 let expr = requirements
2645 .iter()
2646 .map(|requirement| {
2647 let expr: PhysicalSortExpr = requirement.to_owned().into();
2648 let sort_expr = protobuf::PhysicalSortExprNode {
2649 expr: Some(Box::new(serialize_physical_expr(
2650 &expr.expr,
2651 extension_codec,
2652 )?)),
2653 asc: !expr.options.descending,
2654 nulls_first: expr.options.nulls_first,
2655 };
2656 Ok(sort_expr)
2657 })
2658 .collect::<Result<Vec<_>>>()?;
2659 Some(protobuf::PhysicalSortExprNodeCollection {
2660 physical_sort_expr_nodes: expr,
2661 })
2662 }
2663 None => None,
2664 };
2665
2666 if let Some(sink) = exec.sink().as_any().downcast_ref::<JsonSink>() {
2667 return Ok(Some(protobuf::PhysicalPlanNode {
2668 physical_plan_type: Some(PhysicalPlanType::JsonSink(Box::new(
2669 protobuf::JsonSinkExecNode {
2670 input: Some(Box::new(input)),
2671 sink: Some(sink.try_into()?),
2672 sink_schema: Some(exec.schema().as_ref().try_into()?),
2673 sort_order,
2674 },
2675 ))),
2676 }));
2677 }
2678
2679 if let Some(sink) = exec.sink().as_any().downcast_ref::<CsvSink>() {
2680 return Ok(Some(protobuf::PhysicalPlanNode {
2681 physical_plan_type: Some(PhysicalPlanType::CsvSink(Box::new(
2682 protobuf::CsvSinkExecNode {
2683 input: Some(Box::new(input)),
2684 sink: Some(sink.try_into()?),
2685 sink_schema: Some(exec.schema().as_ref().try_into()?),
2686 sort_order,
2687 },
2688 ))),
2689 }));
2690 }
2691
2692 #[cfg(feature = "parquet")]
2693 if let Some(sink) = exec.sink().as_any().downcast_ref::<ParquetSink>() {
2694 return Ok(Some(protobuf::PhysicalPlanNode {
2695 physical_plan_type: Some(PhysicalPlanType::ParquetSink(Box::new(
2696 protobuf::ParquetSinkExecNode {
2697 input: Some(Box::new(input)),
2698 sink: Some(sink.try_into()?),
2699 sink_schema: Some(exec.schema().as_ref().try_into()?),
2700 sort_order,
2701 },
2702 ))),
2703 }));
2704 }
2705
2706 Ok(None)
2708 }
2709
2710 fn try_from_unnest_exec(
2711 exec: &UnnestExec,
2712 extension_codec: &dyn PhysicalExtensionCodec,
2713 ) -> Result<Self> {
2714 let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2715 exec.input().to_owned(),
2716 extension_codec,
2717 )?;
2718
2719 Ok(protobuf::PhysicalPlanNode {
2720 physical_plan_type: Some(PhysicalPlanType::Unnest(Box::new(
2721 protobuf::UnnestExecNode {
2722 input: Some(Box::new(input)),
2723 schema: Some(exec.schema().try_into()?),
2724 list_type_columns: exec
2725 .list_column_indices()
2726 .iter()
2727 .map(|c| ProtoListUnnest {
2728 index_in_input_schema: c.index_in_input_schema as _,
2729 depth: c.depth as _,
2730 })
2731 .collect(),
2732 struct_type_columns: exec
2733 .struct_column_indices()
2734 .iter()
2735 .map(|c| *c as _)
2736 .collect(),
2737 options: Some(exec.options().into()),
2738 },
2739 ))),
2740 })
2741 }
2742}
2743
2744pub trait AsExecutionPlan: Debug + Send + Sync + Clone {
2745 fn try_decode(buf: &[u8]) -> Result<Self>
2746 where
2747 Self: Sized;
2748
2749 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
2750 where
2751 B: BufMut,
2752 Self: Sized;
2753
2754 fn try_into_physical_plan(
2755 &self,
2756 registry: &dyn FunctionRegistry,
2757 runtime: &RuntimeEnv,
2758 extension_codec: &dyn PhysicalExtensionCodec,
2759 ) -> Result<Arc<dyn ExecutionPlan>>;
2760
2761 fn try_from_physical_plan(
2762 plan: Arc<dyn ExecutionPlan>,
2763 extension_codec: &dyn PhysicalExtensionCodec,
2764 ) -> Result<Self>
2765 where
2766 Self: Sized;
2767}
2768
2769pub trait PhysicalExtensionCodec: Debug + Send + Sync {
2770 fn try_decode(
2771 &self,
2772 buf: &[u8],
2773 inputs: &[Arc<dyn ExecutionPlan>],
2774 registry: &dyn FunctionRegistry,
2775 ) -> Result<Arc<dyn ExecutionPlan>>;
2776
2777 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()>;
2778
2779 fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
2780 not_impl_err!("PhysicalExtensionCodec is not provided for scalar function {name}")
2781 }
2782
2783 fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
2784 Ok(())
2785 }
2786
2787 fn try_decode_expr(
2788 &self,
2789 _buf: &[u8],
2790 _inputs: &[Arc<dyn PhysicalExpr>],
2791 ) -> Result<Arc<dyn PhysicalExpr>> {
2792 not_impl_err!("PhysicalExtensionCodec is not provided")
2793 }
2794
2795 fn try_encode_expr(
2796 &self,
2797 _node: &Arc<dyn PhysicalExpr>,
2798 _buf: &mut Vec<u8>,
2799 ) -> Result<()> {
2800 not_impl_err!("PhysicalExtensionCodec is not provided")
2801 }
2802
2803 fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
2804 not_impl_err!(
2805 "PhysicalExtensionCodec is not provided for aggregate function {name}"
2806 )
2807 }
2808
2809 fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
2810 Ok(())
2811 }
2812
2813 fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
2814 not_impl_err!("PhysicalExtensionCodec is not provided for window function {name}")
2815 }
2816
2817 fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
2818 Ok(())
2819 }
2820}
2821
2822#[derive(Debug)]
2823pub struct DefaultPhysicalExtensionCodec {}
2824
2825impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec {
2826 fn try_decode(
2827 &self,
2828 _buf: &[u8],
2829 _inputs: &[Arc<dyn ExecutionPlan>],
2830 _registry: &dyn FunctionRegistry,
2831 ) -> Result<Arc<dyn ExecutionPlan>> {
2832 not_impl_err!("PhysicalExtensionCodec is not provided")
2833 }
2834
2835 fn try_encode(
2836 &self,
2837 _node: Arc<dyn ExecutionPlan>,
2838 _buf: &mut Vec<u8>,
2839 ) -> Result<()> {
2840 not_impl_err!("PhysicalExtensionCodec is not provided")
2841 }
2842}
2843
2844fn into_physical_plan(
2845 node: &Option<Box<protobuf::PhysicalPlanNode>>,
2846 registry: &dyn FunctionRegistry,
2847 runtime: &RuntimeEnv,
2848 extension_codec: &dyn PhysicalExtensionCodec,
2849) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
2850 if let Some(field) = node {
2851 field.try_into_physical_plan(registry, runtime, extension_codec)
2852 } else {
2853 Err(proto_error("Missing required field in protobuf"))
2854 }
2855}