datafusion_proto/physical_plan/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use self::from_proto::parse_protobuf_partitioning;
22use self::to_proto::{serialize_partitioning, serialize_physical_expr};
23use crate::common::{byte_to_string, str_to_byte};
24use crate::physical_plan::from_proto::{
25    parse_physical_expr, parse_physical_sort_expr, parse_physical_sort_exprs,
26    parse_physical_window_expr, parse_protobuf_file_scan_config,
27    parse_protobuf_file_scan_schema,
28};
29use crate::physical_plan::to_proto::{
30    serialize_file_scan_config, serialize_maybe_filter, serialize_physical_aggr_expr,
31    serialize_physical_window_expr,
32};
33use crate::protobuf::physical_aggregate_expr_node::AggregateFunction;
34use crate::protobuf::physical_expr_node::ExprType;
35use crate::protobuf::physical_plan_node::PhysicalPlanType;
36use crate::protobuf::{
37    self, proto_error, window_agg_exec_node, ListUnnest as ProtoListUnnest,
38};
39use crate::{convert_required, into_required};
40
41use datafusion::arrow::compute::SortOptions;
42use datafusion::arrow::datatypes::{Schema, SchemaRef};
43use datafusion::datasource::file_format::csv::CsvSink;
44use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
45use datafusion::datasource::file_format::json::JsonSink;
46#[cfg(feature = "parquet")]
47use datafusion::datasource::file_format::parquet::ParquetSink;
48#[cfg(feature = "avro")]
49use datafusion::datasource::physical_plan::AvroSource;
50#[cfg(feature = "parquet")]
51use datafusion::datasource::physical_plan::ParquetSource;
52use datafusion::datasource::physical_plan::{
53    CsvSource, FileScanConfig, FileScanConfigBuilder, JsonSource,
54};
55use datafusion::datasource::sink::DataSinkExec;
56use datafusion::datasource::source::DataSourceExec;
57use datafusion::execution::runtime_env::RuntimeEnv;
58use datafusion::execution::FunctionRegistry;
59use datafusion::physical_expr::aggregate::AggregateExprBuilder;
60use datafusion::physical_expr::aggregate::AggregateFunctionExpr;
61use datafusion::physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef};
62use datafusion::physical_plan::aggregates::AggregateMode;
63use datafusion::physical_plan::aggregates::{AggregateExec, PhysicalGroupBy};
64use datafusion::physical_plan::analyze::AnalyzeExec;
65use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
66use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
67use datafusion::physical_plan::coop::CooperativeExec;
68use datafusion::physical_plan::empty::EmptyExec;
69use datafusion::physical_plan::explain::ExplainExec;
70use datafusion::physical_plan::expressions::PhysicalSortExpr;
71use datafusion::physical_plan::filter::FilterExec;
72use datafusion::physical_plan::joins::utils::{ColumnIndex, JoinFilter};
73use datafusion::physical_plan::joins::{
74    CrossJoinExec, NestedLoopJoinExec, StreamJoinPartitionMode, SymmetricHashJoinExec,
75};
76use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
77use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
78use datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
79use datafusion::physical_plan::projection::ProjectionExec;
80use datafusion::physical_plan::repartition::RepartitionExec;
81use datafusion::physical_plan::sorts::sort::SortExec;
82use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
83use datafusion::physical_plan::union::{InterleaveExec, UnionExec};
84use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec};
85use datafusion::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
86use datafusion::physical_plan::{
87    ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr,
88};
89use datafusion_common::config::TableParquetOptions;
90use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
91use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
92
93use prost::bytes::BufMut;
94use prost::Message;
95
96pub mod from_proto;
97pub mod to_proto;
98
99impl AsExecutionPlan for protobuf::PhysicalPlanNode {
100    fn try_decode(buf: &[u8]) -> Result<Self>
101    where
102        Self: Sized,
103    {
104        protobuf::PhysicalPlanNode::decode(buf).map_err(|e| {
105            DataFusionError::Internal(format!("failed to decode physical plan: {e:?}"))
106        })
107    }
108
109    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
110    where
111        B: BufMut,
112        Self: Sized,
113    {
114        self.encode(buf).map_err(|e| {
115            DataFusionError::Internal(format!("failed to encode physical plan: {e:?}"))
116        })
117    }
118
119    fn try_into_physical_plan(
120        &self,
121        registry: &dyn FunctionRegistry,
122        runtime: &RuntimeEnv,
123        extension_codec: &dyn PhysicalExtensionCodec,
124    ) -> Result<Arc<dyn ExecutionPlan>> {
125        let plan = self.physical_plan_type.as_ref().ok_or_else(|| {
126            proto_error(format!(
127                "physical_plan::from_proto() Unsupported physical plan '{self:?}'"
128            ))
129        })?;
130        match plan {
131            PhysicalPlanType::Explain(explain) => self.try_into_explain_physical_plan(
132                explain,
133                registry,
134                runtime,
135                extension_codec,
136            ),
137            PhysicalPlanType::Projection(projection) => self
138                .try_into_projection_physical_plan(
139                    projection,
140                    registry,
141                    runtime,
142                    extension_codec,
143                ),
144            PhysicalPlanType::Filter(filter) => self.try_into_filter_physical_plan(
145                filter,
146                registry,
147                runtime,
148                extension_codec,
149            ),
150            PhysicalPlanType::CsvScan(scan) => self.try_into_csv_scan_physical_plan(
151                scan,
152                registry,
153                runtime,
154                extension_codec,
155            ),
156            PhysicalPlanType::JsonScan(scan) => self.try_into_json_scan_physical_plan(
157                scan,
158                registry,
159                runtime,
160                extension_codec,
161            ),
162            #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
163            PhysicalPlanType::ParquetScan(scan) => self
164                .try_into_parquet_scan_physical_plan(
165                    scan,
166                    registry,
167                    runtime,
168                    extension_codec,
169                ),
170            #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
171            PhysicalPlanType::AvroScan(scan) => self.try_into_avro_scan_physical_plan(
172                scan,
173                registry,
174                runtime,
175                extension_codec,
176            ),
177            PhysicalPlanType::CoalesceBatches(coalesce_batches) => self
178                .try_into_coalesce_batches_physical_plan(
179                    coalesce_batches,
180                    registry,
181                    runtime,
182                    extension_codec,
183                ),
184            PhysicalPlanType::Merge(merge) => self.try_into_merge_physical_plan(
185                merge,
186                registry,
187                runtime,
188                extension_codec,
189            ),
190            PhysicalPlanType::Repartition(repart) => self
191                .try_into_repartition_physical_plan(
192                    repart,
193                    registry,
194                    runtime,
195                    extension_codec,
196                ),
197            PhysicalPlanType::GlobalLimit(limit) => self
198                .try_into_global_limit_physical_plan(
199                    limit,
200                    registry,
201                    runtime,
202                    extension_codec,
203                ),
204            PhysicalPlanType::LocalLimit(limit) => self
205                .try_into_local_limit_physical_plan(
206                    limit,
207                    registry,
208                    runtime,
209                    extension_codec,
210                ),
211            PhysicalPlanType::Window(window_agg) => self.try_into_window_physical_plan(
212                window_agg,
213                registry,
214                runtime,
215                extension_codec,
216            ),
217            PhysicalPlanType::Aggregate(hash_agg) => self
218                .try_into_aggregate_physical_plan(
219                    hash_agg,
220                    registry,
221                    runtime,
222                    extension_codec,
223                ),
224            PhysicalPlanType::HashJoin(hashjoin) => self
225                .try_into_hash_join_physical_plan(
226                    hashjoin,
227                    registry,
228                    runtime,
229                    extension_codec,
230                ),
231            PhysicalPlanType::SymmetricHashJoin(sym_join) => self
232                .try_into_symmetric_hash_join_physical_plan(
233                    sym_join,
234                    registry,
235                    runtime,
236                    extension_codec,
237                ),
238            PhysicalPlanType::Union(union) => self.try_into_union_physical_plan(
239                union,
240                registry,
241                runtime,
242                extension_codec,
243            ),
244            PhysicalPlanType::Interleave(interleave) => self
245                .try_into_interleave_physical_plan(
246                    interleave,
247                    registry,
248                    runtime,
249                    extension_codec,
250                ),
251            PhysicalPlanType::CrossJoin(crossjoin) => self
252                .try_into_cross_join_physical_plan(
253                    crossjoin,
254                    registry,
255                    runtime,
256                    extension_codec,
257                ),
258            PhysicalPlanType::Empty(empty) => self.try_into_empty_physical_plan(
259                empty,
260                registry,
261                runtime,
262                extension_codec,
263            ),
264            PhysicalPlanType::PlaceholderRow(placeholder) => self
265                .try_into_placeholder_row_physical_plan(
266                    placeholder,
267                    registry,
268                    runtime,
269                    extension_codec,
270                ),
271            PhysicalPlanType::Sort(sort) => {
272                self.try_into_sort_physical_plan(sort, registry, runtime, extension_codec)
273            }
274            PhysicalPlanType::SortPreservingMerge(sort) => self
275                .try_into_sort_preserving_merge_physical_plan(
276                    sort,
277                    registry,
278                    runtime,
279                    extension_codec,
280                ),
281            PhysicalPlanType::Extension(extension) => self
282                .try_into_extension_physical_plan(
283                    extension,
284                    registry,
285                    runtime,
286                    extension_codec,
287                ),
288            PhysicalPlanType::NestedLoopJoin(join) => self
289                .try_into_nested_loop_join_physical_plan(
290                    join,
291                    registry,
292                    runtime,
293                    extension_codec,
294                ),
295            PhysicalPlanType::Analyze(analyze) => self.try_into_analyze_physical_plan(
296                analyze,
297                registry,
298                runtime,
299                extension_codec,
300            ),
301            PhysicalPlanType::JsonSink(sink) => self.try_into_json_sink_physical_plan(
302                sink,
303                registry,
304                runtime,
305                extension_codec,
306            ),
307            PhysicalPlanType::CsvSink(sink) => self.try_into_csv_sink_physical_plan(
308                sink,
309                registry,
310                runtime,
311                extension_codec,
312            ),
313            #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
314            PhysicalPlanType::ParquetSink(sink) => self
315                .try_into_parquet_sink_physical_plan(
316                    sink,
317                    registry,
318                    runtime,
319                    extension_codec,
320                ),
321            PhysicalPlanType::Unnest(unnest) => self.try_into_unnest_physical_plan(
322                unnest,
323                registry,
324                runtime,
325                extension_codec,
326            ),
327            PhysicalPlanType::Cooperative(cooperative) => self
328                .try_into_cooperative_physical_plan(
329                    cooperative,
330                    registry,
331                    runtime,
332                    extension_codec,
333                ),
334        }
335    }
336
337    fn try_from_physical_plan(
338        plan: Arc<dyn ExecutionPlan>,
339        extension_codec: &dyn PhysicalExtensionCodec,
340    ) -> Result<Self>
341    where
342        Self: Sized,
343    {
344        let plan_clone = Arc::clone(&plan);
345        let plan = plan.as_any();
346
347        if let Some(exec) = plan.downcast_ref::<ExplainExec>() {
348            return protobuf::PhysicalPlanNode::try_from_explain_exec(
349                exec,
350                extension_codec,
351            );
352        }
353
354        if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
355            return protobuf::PhysicalPlanNode::try_from_projection_exec(
356                exec,
357                extension_codec,
358            );
359        }
360
361        if let Some(exec) = plan.downcast_ref::<AnalyzeExec>() {
362            return protobuf::PhysicalPlanNode::try_from_analyze_exec(
363                exec,
364                extension_codec,
365            );
366        }
367
368        if let Some(exec) = plan.downcast_ref::<FilterExec>() {
369            return protobuf::PhysicalPlanNode::try_from_filter_exec(
370                exec,
371                extension_codec,
372            );
373        }
374
375        if let Some(limit) = plan.downcast_ref::<GlobalLimitExec>() {
376            return protobuf::PhysicalPlanNode::try_from_global_limit_exec(
377                limit,
378                extension_codec,
379            );
380        }
381
382        if let Some(limit) = plan.downcast_ref::<LocalLimitExec>() {
383            return protobuf::PhysicalPlanNode::try_from_local_limit_exec(
384                limit,
385                extension_codec,
386            );
387        }
388
389        if let Some(exec) = plan.downcast_ref::<HashJoinExec>() {
390            return protobuf::PhysicalPlanNode::try_from_hash_join_exec(
391                exec,
392                extension_codec,
393            );
394        }
395
396        if let Some(exec) = plan.downcast_ref::<SymmetricHashJoinExec>() {
397            return protobuf::PhysicalPlanNode::try_from_symmetric_hash_join_exec(
398                exec,
399                extension_codec,
400            );
401        }
402
403        if let Some(exec) = plan.downcast_ref::<CrossJoinExec>() {
404            return protobuf::PhysicalPlanNode::try_from_cross_join_exec(
405                exec,
406                extension_codec,
407            );
408        }
409
410        if let Some(exec) = plan.downcast_ref::<AggregateExec>() {
411            return protobuf::PhysicalPlanNode::try_from_aggregate_exec(
412                exec,
413                extension_codec,
414            );
415        }
416
417        if let Some(empty) = plan.downcast_ref::<EmptyExec>() {
418            return protobuf::PhysicalPlanNode::try_from_empty_exec(
419                empty,
420                extension_codec,
421            );
422        }
423
424        if let Some(empty) = plan.downcast_ref::<PlaceholderRowExec>() {
425            return protobuf::PhysicalPlanNode::try_from_placeholder_row_exec(
426                empty,
427                extension_codec,
428            );
429        }
430
431        if let Some(coalesce_batches) = plan.downcast_ref::<CoalesceBatchesExec>() {
432            return protobuf::PhysicalPlanNode::try_from_coalesce_batches_exec(
433                coalesce_batches,
434                extension_codec,
435            );
436        }
437
438        if let Some(data_source_exec) = plan.downcast_ref::<DataSourceExec>() {
439            if let Some(node) = protobuf::PhysicalPlanNode::try_from_data_source_exec(
440                data_source_exec,
441                extension_codec,
442            )? {
443                return Ok(node);
444            }
445        }
446
447        if let Some(exec) = plan.downcast_ref::<CoalescePartitionsExec>() {
448            return protobuf::PhysicalPlanNode::try_from_coalesce_partitions_exec(
449                exec,
450                extension_codec,
451            );
452        }
453
454        if let Some(exec) = plan.downcast_ref::<RepartitionExec>() {
455            return protobuf::PhysicalPlanNode::try_from_repartition_exec(
456                exec,
457                extension_codec,
458            );
459        }
460
461        if let Some(exec) = plan.downcast_ref::<SortExec>() {
462            return protobuf::PhysicalPlanNode::try_from_sort_exec(exec, extension_codec);
463        }
464
465        if let Some(union) = plan.downcast_ref::<UnionExec>() {
466            return protobuf::PhysicalPlanNode::try_from_union_exec(
467                union,
468                extension_codec,
469            );
470        }
471
472        if let Some(interleave) = plan.downcast_ref::<InterleaveExec>() {
473            return protobuf::PhysicalPlanNode::try_from_interleave_exec(
474                interleave,
475                extension_codec,
476            );
477        }
478
479        if let Some(exec) = plan.downcast_ref::<SortPreservingMergeExec>() {
480            return protobuf::PhysicalPlanNode::try_from_sort_preserving_merge_exec(
481                exec,
482                extension_codec,
483            );
484        }
485
486        if let Some(exec) = plan.downcast_ref::<NestedLoopJoinExec>() {
487            return protobuf::PhysicalPlanNode::try_from_nested_loop_join_exec(
488                exec,
489                extension_codec,
490            );
491        }
492
493        if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
494            return protobuf::PhysicalPlanNode::try_from_window_agg_exec(
495                exec,
496                extension_codec,
497            );
498        }
499
500        if let Some(exec) = plan.downcast_ref::<BoundedWindowAggExec>() {
501            return protobuf::PhysicalPlanNode::try_from_bounded_window_agg_exec(
502                exec,
503                extension_codec,
504            );
505        }
506
507        if let Some(exec) = plan.downcast_ref::<DataSinkExec>() {
508            if let Some(node) = protobuf::PhysicalPlanNode::try_from_data_sink_exec(
509                exec,
510                extension_codec,
511            )? {
512                return Ok(node);
513            }
514        }
515
516        if let Some(exec) = plan.downcast_ref::<UnnestExec>() {
517            return protobuf::PhysicalPlanNode::try_from_unnest_exec(
518                exec,
519                extension_codec,
520            );
521        }
522
523        if let Some(exec) = plan.downcast_ref::<CooperativeExec>() {
524            return protobuf::PhysicalPlanNode::try_from_cooperative_exec(
525                exec,
526                extension_codec,
527            );
528        }
529
530        let mut buf: Vec<u8> = vec![];
531        match extension_codec.try_encode(Arc::clone(&plan_clone), &mut buf) {
532            Ok(_) => {
533                let inputs: Vec<protobuf::PhysicalPlanNode> = plan_clone
534                    .children()
535                    .into_iter()
536                    .cloned()
537                    .map(|i| {
538                        protobuf::PhysicalPlanNode::try_from_physical_plan(
539                            i,
540                            extension_codec,
541                        )
542                    })
543                    .collect::<Result<_>>()?;
544
545                Ok(protobuf::PhysicalPlanNode {
546                    physical_plan_type: Some(PhysicalPlanType::Extension(
547                        protobuf::PhysicalExtensionNode { node: buf, inputs },
548                    )),
549                })
550            }
551            Err(e) => internal_err!(
552                "Unsupported plan and extension codec failed with [{e}]. Plan: {plan_clone:?}"
553            ),
554        }
555    }
556}
557
558impl protobuf::PhysicalPlanNode {
559    fn try_into_explain_physical_plan(
560        &self,
561        explain: &protobuf::ExplainExecNode,
562        _registry: &dyn FunctionRegistry,
563        _runtime: &RuntimeEnv,
564        _extension_codec: &dyn PhysicalExtensionCodec,
565    ) -> Result<Arc<dyn ExecutionPlan>> {
566        Ok(Arc::new(ExplainExec::new(
567            Arc::new(explain.schema.as_ref().unwrap().try_into()?),
568            explain
569                .stringified_plans
570                .iter()
571                .map(|plan| plan.into())
572                .collect(),
573            explain.verbose,
574        )))
575    }
576
577    fn try_into_projection_physical_plan(
578        &self,
579        projection: &protobuf::ProjectionExecNode,
580        registry: &dyn FunctionRegistry,
581        runtime: &RuntimeEnv,
582        extension_codec: &dyn PhysicalExtensionCodec,
583    ) -> Result<Arc<dyn ExecutionPlan>> {
584        let input: Arc<dyn ExecutionPlan> =
585            into_physical_plan(&projection.input, registry, runtime, extension_codec)?;
586        let exprs = projection
587            .expr
588            .iter()
589            .zip(projection.expr_name.iter())
590            .map(|(expr, name)| {
591                Ok((
592                    parse_physical_expr(
593                        expr,
594                        registry,
595                        input.schema().as_ref(),
596                        extension_codec,
597                    )?,
598                    name.to_string(),
599                ))
600            })
601            .collect::<Result<Vec<(Arc<dyn PhysicalExpr>, String)>>>()?;
602        Ok(Arc::new(ProjectionExec::try_new(exprs, input)?))
603    }
604
605    fn try_into_filter_physical_plan(
606        &self,
607        filter: &protobuf::FilterExecNode,
608        registry: &dyn FunctionRegistry,
609        runtime: &RuntimeEnv,
610        extension_codec: &dyn PhysicalExtensionCodec,
611    ) -> Result<Arc<dyn ExecutionPlan>> {
612        let input: Arc<dyn ExecutionPlan> =
613            into_physical_plan(&filter.input, registry, runtime, extension_codec)?;
614
615        let predicate = filter
616            .expr
617            .as_ref()
618            .map(|expr| {
619                parse_physical_expr(
620                    expr,
621                    registry,
622                    input.schema().as_ref(),
623                    extension_codec,
624                )
625            })
626            .transpose()?
627            .ok_or_else(|| {
628                DataFusionError::Internal(
629                    "filter (FilterExecNode) in PhysicalPlanNode is missing.".to_owned(),
630                )
631            })?;
632        let filter_selectivity = filter.default_filter_selectivity.try_into();
633        let projection = if !filter.projection.is_empty() {
634            Some(
635                filter
636                    .projection
637                    .iter()
638                    .map(|i| *i as usize)
639                    .collect::<Vec<_>>(),
640            )
641        } else {
642            None
643        };
644        let filter =
645            FilterExec::try_new(predicate, input)?.with_projection(projection)?;
646        match filter_selectivity {
647            Ok(filter_selectivity) => Ok(Arc::new(
648                filter.with_default_selectivity(filter_selectivity)?,
649            )),
650            Err(_) => Err(DataFusionError::Internal(
651                "filter_selectivity in PhysicalPlanNode is invalid ".to_owned(),
652            )),
653        }
654    }
655
656    fn try_into_csv_scan_physical_plan(
657        &self,
658        scan: &protobuf::CsvScanExecNode,
659        registry: &dyn FunctionRegistry,
660        _runtime: &RuntimeEnv,
661        extension_codec: &dyn PhysicalExtensionCodec,
662    ) -> Result<Arc<dyn ExecutionPlan>> {
663        let escape =
664            if let Some(protobuf::csv_scan_exec_node::OptionalEscape::Escape(escape)) =
665                &scan.optional_escape
666            {
667                Some(str_to_byte(escape, "escape")?)
668            } else {
669                None
670            };
671
672        let comment = if let Some(
673            protobuf::csv_scan_exec_node::OptionalComment::Comment(comment),
674        ) = &scan.optional_comment
675        {
676            Some(str_to_byte(comment, "comment")?)
677        } else {
678            None
679        };
680
681        let source = Arc::new(
682            CsvSource::new(
683                scan.has_header,
684                str_to_byte(&scan.delimiter, "delimiter")?,
685                0,
686            )
687            .with_escape(escape)
688            .with_comment(comment),
689        );
690
691        let conf = FileScanConfigBuilder::from(parse_protobuf_file_scan_config(
692            scan.base_conf.as_ref().unwrap(),
693            registry,
694            extension_codec,
695            source,
696        )?)
697        .with_newlines_in_values(scan.newlines_in_values)
698        .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
699        .build();
700        Ok(DataSourceExec::from_data_source(conf))
701    }
702
703    fn try_into_json_scan_physical_plan(
704        &self,
705        scan: &protobuf::JsonScanExecNode,
706        registry: &dyn FunctionRegistry,
707        _runtime: &RuntimeEnv,
708        extension_codec: &dyn PhysicalExtensionCodec,
709    ) -> Result<Arc<dyn ExecutionPlan>> {
710        let scan_conf = parse_protobuf_file_scan_config(
711            scan.base_conf.as_ref().unwrap(),
712            registry,
713            extension_codec,
714            Arc::new(JsonSource::new()),
715        )?;
716        Ok(DataSourceExec::from_data_source(scan_conf))
717    }
718
719    #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
720    fn try_into_parquet_scan_physical_plan(
721        &self,
722        scan: &protobuf::ParquetScanExecNode,
723        registry: &dyn FunctionRegistry,
724        _runtime: &RuntimeEnv,
725        extension_codec: &dyn PhysicalExtensionCodec,
726    ) -> Result<Arc<dyn ExecutionPlan>> {
727        #[cfg(feature = "parquet")]
728        {
729            let schema =
730                parse_protobuf_file_scan_schema(scan.base_conf.as_ref().unwrap())?;
731
732            // Check if there's a projection and use projected schema for predicate parsing
733            let base_conf = scan.base_conf.as_ref().unwrap();
734            let predicate_schema = if !base_conf.projection.is_empty() {
735                // Create projected schema for parsing the predicate
736                let projected_fields: Vec<_> = base_conf
737                    .projection
738                    .iter()
739                    .map(|&i| schema.field(i as usize).clone())
740                    .collect();
741                Arc::new(Schema::new(projected_fields))
742            } else {
743                schema
744            };
745
746            let predicate = scan
747                .predicate
748                .as_ref()
749                .map(|expr| {
750                    parse_physical_expr(
751                        expr,
752                        registry,
753                        predicate_schema.as_ref(),
754                        extension_codec,
755                    )
756                })
757                .transpose()?;
758            let mut options = TableParquetOptions::default();
759
760            if let Some(table_options) = scan.parquet_options.as_ref() {
761                options = table_options.try_into()?;
762            }
763            let mut source = ParquetSource::new(options);
764
765            if let Some(predicate) = predicate {
766                source = source.with_predicate(predicate);
767            }
768            let base_config = parse_protobuf_file_scan_config(
769                base_conf,
770                registry,
771                extension_codec,
772                Arc::new(source),
773            )?;
774            Ok(DataSourceExec::from_data_source(base_config))
775        }
776        #[cfg(not(feature = "parquet"))]
777        panic!("Unable to process a Parquet PhysicalPlan when `parquet` feature is not enabled")
778    }
779
780    #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
781    fn try_into_avro_scan_physical_plan(
782        &self,
783        scan: &protobuf::AvroScanExecNode,
784        registry: &dyn FunctionRegistry,
785        _runtime: &RuntimeEnv,
786        extension_codec: &dyn PhysicalExtensionCodec,
787    ) -> Result<Arc<dyn ExecutionPlan>> {
788        #[cfg(feature = "avro")]
789        {
790            let conf = parse_protobuf_file_scan_config(
791                scan.base_conf.as_ref().unwrap(),
792                registry,
793                extension_codec,
794                Arc::new(AvroSource::new()),
795            )?;
796            Ok(DataSourceExec::from_data_source(conf))
797        }
798        #[cfg(not(feature = "avro"))]
799        panic!("Unable to process a Avro PhysicalPlan when `avro` feature is not enabled")
800    }
801
802    fn try_into_coalesce_batches_physical_plan(
803        &self,
804        coalesce_batches: &protobuf::CoalesceBatchesExecNode,
805        registry: &dyn FunctionRegistry,
806        runtime: &RuntimeEnv,
807        extension_codec: &dyn PhysicalExtensionCodec,
808    ) -> Result<Arc<dyn ExecutionPlan>> {
809        let input: Arc<dyn ExecutionPlan> = into_physical_plan(
810            &coalesce_batches.input,
811            registry,
812            runtime,
813            extension_codec,
814        )?;
815        Ok(Arc::new(
816            CoalesceBatchesExec::new(input, coalesce_batches.target_batch_size as usize)
817                .with_fetch(coalesce_batches.fetch.map(|f| f as usize)),
818        ))
819    }
820
821    fn try_into_merge_physical_plan(
822        &self,
823        merge: &protobuf::CoalescePartitionsExecNode,
824        registry: &dyn FunctionRegistry,
825        runtime: &RuntimeEnv,
826        extension_codec: &dyn PhysicalExtensionCodec,
827    ) -> Result<Arc<dyn ExecutionPlan>> {
828        let input: Arc<dyn ExecutionPlan> =
829            into_physical_plan(&merge.input, registry, runtime, extension_codec)?;
830        Ok(Arc::new(
831            CoalescePartitionsExec::new(input)
832                .with_fetch(merge.fetch.map(|f| f as usize)),
833        ))
834    }
835
836    fn try_into_repartition_physical_plan(
837        &self,
838        repart: &protobuf::RepartitionExecNode,
839        registry: &dyn FunctionRegistry,
840        runtime: &RuntimeEnv,
841        extension_codec: &dyn PhysicalExtensionCodec,
842    ) -> Result<Arc<dyn ExecutionPlan>> {
843        let input: Arc<dyn ExecutionPlan> =
844            into_physical_plan(&repart.input, registry, runtime, extension_codec)?;
845        let partitioning = parse_protobuf_partitioning(
846            repart.partitioning.as_ref(),
847            registry,
848            input.schema().as_ref(),
849            extension_codec,
850        )?;
851        Ok(Arc::new(RepartitionExec::try_new(
852            input,
853            partitioning.unwrap(),
854        )?))
855    }
856
857    fn try_into_global_limit_physical_plan(
858        &self,
859        limit: &protobuf::GlobalLimitExecNode,
860        registry: &dyn FunctionRegistry,
861        runtime: &RuntimeEnv,
862        extension_codec: &dyn PhysicalExtensionCodec,
863    ) -> Result<Arc<dyn ExecutionPlan>> {
864        let input: Arc<dyn ExecutionPlan> =
865            into_physical_plan(&limit.input, registry, runtime, extension_codec)?;
866        let fetch = if limit.fetch >= 0 {
867            Some(limit.fetch as usize)
868        } else {
869            None
870        };
871        Ok(Arc::new(GlobalLimitExec::new(
872            input,
873            limit.skip as usize,
874            fetch,
875        )))
876    }
877
878    fn try_into_local_limit_physical_plan(
879        &self,
880        limit: &protobuf::LocalLimitExecNode,
881        registry: &dyn FunctionRegistry,
882        runtime: &RuntimeEnv,
883        extension_codec: &dyn PhysicalExtensionCodec,
884    ) -> Result<Arc<dyn ExecutionPlan>> {
885        let input: Arc<dyn ExecutionPlan> =
886            into_physical_plan(&limit.input, registry, runtime, extension_codec)?;
887        Ok(Arc::new(LocalLimitExec::new(input, limit.fetch as usize)))
888    }
889
890    fn try_into_window_physical_plan(
891        &self,
892        window_agg: &protobuf::WindowAggExecNode,
893        registry: &dyn FunctionRegistry,
894        runtime: &RuntimeEnv,
895        extension_codec: &dyn PhysicalExtensionCodec,
896    ) -> Result<Arc<dyn ExecutionPlan>> {
897        let input: Arc<dyn ExecutionPlan> =
898            into_physical_plan(&window_agg.input, registry, runtime, extension_codec)?;
899        let input_schema = input.schema();
900
901        let physical_window_expr: Vec<Arc<dyn WindowExpr>> = window_agg
902            .window_expr
903            .iter()
904            .map(|window_expr| {
905                parse_physical_window_expr(
906                    window_expr,
907                    registry,
908                    input_schema.as_ref(),
909                    extension_codec,
910                )
911            })
912            .collect::<Result<Vec<_>, _>>()?;
913
914        let partition_keys = window_agg
915            .partition_keys
916            .iter()
917            .map(|expr| {
918                parse_physical_expr(
919                    expr,
920                    registry,
921                    input.schema().as_ref(),
922                    extension_codec,
923                )
924            })
925            .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
926
927        if let Some(input_order_mode) = window_agg.input_order_mode.as_ref() {
928            let input_order_mode = match input_order_mode {
929                window_agg_exec_node::InputOrderMode::Linear(_) => InputOrderMode::Linear,
930                window_agg_exec_node::InputOrderMode::PartiallySorted(
931                    protobuf::PartiallySortedInputOrderMode { columns },
932                ) => InputOrderMode::PartiallySorted(
933                    columns.iter().map(|c| *c as usize).collect(),
934                ),
935                window_agg_exec_node::InputOrderMode::Sorted(_) => InputOrderMode::Sorted,
936            };
937
938            Ok(Arc::new(BoundedWindowAggExec::try_new(
939                physical_window_expr,
940                input,
941                input_order_mode,
942                !partition_keys.is_empty(),
943            )?))
944        } else {
945            Ok(Arc::new(WindowAggExec::try_new(
946                physical_window_expr,
947                input,
948                !partition_keys.is_empty(),
949            )?))
950        }
951    }
952
953    fn try_into_aggregate_physical_plan(
954        &self,
955        hash_agg: &protobuf::AggregateExecNode,
956        registry: &dyn FunctionRegistry,
957        runtime: &RuntimeEnv,
958        extension_codec: &dyn PhysicalExtensionCodec,
959    ) -> Result<Arc<dyn ExecutionPlan>> {
960        let input: Arc<dyn ExecutionPlan> =
961            into_physical_plan(&hash_agg.input, registry, runtime, extension_codec)?;
962        let mode = protobuf::AggregateMode::try_from(hash_agg.mode).map_err(|_| {
963            proto_error(format!(
964                "Received a AggregateNode message with unknown AggregateMode {}",
965                hash_agg.mode
966            ))
967        })?;
968        let agg_mode: AggregateMode = match mode {
969            protobuf::AggregateMode::Partial => AggregateMode::Partial,
970            protobuf::AggregateMode::Final => AggregateMode::Final,
971            protobuf::AggregateMode::FinalPartitioned => AggregateMode::FinalPartitioned,
972            protobuf::AggregateMode::Single => AggregateMode::Single,
973            protobuf::AggregateMode::SinglePartitioned => {
974                AggregateMode::SinglePartitioned
975            }
976        };
977
978        let num_expr = hash_agg.group_expr.len();
979
980        let group_expr = hash_agg
981            .group_expr
982            .iter()
983            .zip(hash_agg.group_expr_name.iter())
984            .map(|(expr, name)| {
985                parse_physical_expr(
986                    expr,
987                    registry,
988                    input.schema().as_ref(),
989                    extension_codec,
990                )
991                .map(|expr| (expr, name.to_string()))
992            })
993            .collect::<Result<Vec<_>, _>>()?;
994
995        let null_expr = hash_agg
996            .null_expr
997            .iter()
998            .zip(hash_agg.group_expr_name.iter())
999            .map(|(expr, name)| {
1000                parse_physical_expr(
1001                    expr,
1002                    registry,
1003                    input.schema().as_ref(),
1004                    extension_codec,
1005                )
1006                .map(|expr| (expr, name.to_string()))
1007            })
1008            .collect::<Result<Vec<_>, _>>()?;
1009
1010        let groups: Vec<Vec<bool>> = if !hash_agg.groups.is_empty() {
1011            hash_agg
1012                .groups
1013                .chunks(num_expr)
1014                .map(|g| g.to_vec())
1015                .collect::<Vec<Vec<bool>>>()
1016        } else {
1017            vec![]
1018        };
1019
1020        let input_schema = hash_agg.input_schema.as_ref().ok_or_else(|| {
1021            DataFusionError::Internal(
1022                "input_schema in AggregateNode is missing.".to_owned(),
1023            )
1024        })?;
1025        let physical_schema: SchemaRef = SchemaRef::new(input_schema.try_into()?);
1026
1027        let physical_filter_expr = hash_agg
1028            .filter_expr
1029            .iter()
1030            .map(|expr| {
1031                expr.expr
1032                    .as_ref()
1033                    .map(|e| {
1034                        parse_physical_expr(
1035                            e,
1036                            registry,
1037                            &physical_schema,
1038                            extension_codec,
1039                        )
1040                    })
1041                    .transpose()
1042            })
1043            .collect::<Result<Vec<_>, _>>()?;
1044
1045        let physical_aggr_expr: Vec<Arc<AggregateFunctionExpr>> = hash_agg
1046            .aggr_expr
1047            .iter()
1048            .zip(hash_agg.aggr_expr_name.iter())
1049            .map(|(expr, name)| {
1050                let expr_type = expr.expr_type.as_ref().ok_or_else(|| {
1051                    proto_error("Unexpected empty aggregate physical expression")
1052                })?;
1053
1054                match expr_type {
1055                    ExprType::AggregateExpr(agg_node) => {
1056                        let input_phy_expr: Vec<Arc<dyn PhysicalExpr>> = agg_node
1057                            .expr
1058                            .iter()
1059                            .map(|e| {
1060                                parse_physical_expr(
1061                                    e,
1062                                    registry,
1063                                    &physical_schema,
1064                                    extension_codec,
1065                                )
1066                            })
1067                            .collect::<Result<Vec<_>>>()?;
1068                        let order_bys = agg_node
1069                            .ordering_req
1070                            .iter()
1071                            .map(|e| {
1072                                parse_physical_sort_expr(
1073                                    e,
1074                                    registry,
1075                                    &physical_schema,
1076                                    extension_codec,
1077                                )
1078                            })
1079                            .collect::<Result<_>>()?;
1080                        agg_node
1081                            .aggregate_function
1082                            .as_ref()
1083                            .map(|func| match func {
1084                                AggregateFunction::UserDefinedAggrFunction(udaf_name) => {
1085                                    let agg_udf = match &agg_node.fun_definition {
1086                                        Some(buf) => extension_codec
1087                                            .try_decode_udaf(udaf_name, buf)?,
1088                                        None => {
1089                                            registry.udaf(udaf_name).or_else(|_| {
1090                                                extension_codec
1091                                                    .try_decode_udaf(udaf_name, &[])
1092                                            })?
1093                                        }
1094                                    };
1095
1096                                    AggregateExprBuilder::new(agg_udf, input_phy_expr)
1097                                        .schema(Arc::clone(&physical_schema))
1098                                        .alias(name)
1099                                        .with_ignore_nulls(agg_node.ignore_nulls)
1100                                        .with_distinct(agg_node.distinct)
1101                                        .order_by(order_bys)
1102                                        .build()
1103                                        .map(Arc::new)
1104                                }
1105                            })
1106                            .transpose()?
1107                            .ok_or_else(|| {
1108                                proto_error(
1109                                    "Invalid AggregateExpr, missing aggregate_function",
1110                                )
1111                            })
1112                    }
1113                    _ => internal_err!("Invalid aggregate expression for AggregateExec"),
1114                }
1115            })
1116            .collect::<Result<Vec<_>, _>>()?;
1117
1118        let limit = hash_agg
1119            .limit
1120            .as_ref()
1121            .map(|lit_value| lit_value.limit as usize);
1122
1123        let agg = AggregateExec::try_new(
1124            agg_mode,
1125            PhysicalGroupBy::new(group_expr, null_expr, groups),
1126            physical_aggr_expr,
1127            physical_filter_expr,
1128            input,
1129            physical_schema,
1130        )?;
1131
1132        let agg = agg.with_limit(limit);
1133
1134        Ok(Arc::new(agg))
1135    }
1136
1137    fn try_into_hash_join_physical_plan(
1138        &self,
1139        hashjoin: &protobuf::HashJoinExecNode,
1140        registry: &dyn FunctionRegistry,
1141        runtime: &RuntimeEnv,
1142        extension_codec: &dyn PhysicalExtensionCodec,
1143    ) -> Result<Arc<dyn ExecutionPlan>> {
1144        let left: Arc<dyn ExecutionPlan> =
1145            into_physical_plan(&hashjoin.left, registry, runtime, extension_codec)?;
1146        let right: Arc<dyn ExecutionPlan> =
1147            into_physical_plan(&hashjoin.right, registry, runtime, extension_codec)?;
1148        let left_schema = left.schema();
1149        let right_schema = right.schema();
1150        let on: Vec<(PhysicalExprRef, PhysicalExprRef)> = hashjoin
1151            .on
1152            .iter()
1153            .map(|col| {
1154                let left = parse_physical_expr(
1155                    &col.left.clone().unwrap(),
1156                    registry,
1157                    left_schema.as_ref(),
1158                    extension_codec,
1159                )?;
1160                let right = parse_physical_expr(
1161                    &col.right.clone().unwrap(),
1162                    registry,
1163                    right_schema.as_ref(),
1164                    extension_codec,
1165                )?;
1166                Ok((left, right))
1167            })
1168            .collect::<Result<_>>()?;
1169        let join_type =
1170            protobuf::JoinType::try_from(hashjoin.join_type).map_err(|_| {
1171                proto_error(format!(
1172                    "Received a HashJoinNode message with unknown JoinType {}",
1173                    hashjoin.join_type
1174                ))
1175            })?;
1176        let null_equality = protobuf::NullEquality::try_from(hashjoin.null_equality)
1177            .map_err(|_| {
1178                proto_error(format!(
1179                    "Received a HashJoinNode message with unknown NullEquality {}",
1180                    hashjoin.null_equality
1181                ))
1182            })?;
1183        let filter = hashjoin
1184            .filter
1185            .as_ref()
1186            .map(|f| {
1187                let schema = f
1188                    .schema
1189                    .as_ref()
1190                    .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1191                    .try_into()?;
1192
1193                let expression = parse_physical_expr(
1194                    f.expression.as_ref().ok_or_else(|| {
1195                        proto_error("Unexpected empty filter expression")
1196                    })?,
1197                    registry, &schema,
1198                    extension_codec,
1199                )?;
1200                let column_indices = f.column_indices
1201                    .iter()
1202                    .map(|i| {
1203                        let side = protobuf::JoinSide::try_from(i.side)
1204                            .map_err(|_| proto_error(format!(
1205                                "Received a HashJoinNode message with JoinSide in Filter {}",
1206                                i.side))
1207                            )?;
1208
1209                        Ok(ColumnIndex {
1210                            index: i.index as usize,
1211                            side: side.into(),
1212                        })
1213                    })
1214                    .collect::<Result<Vec<_>>>()?;
1215
1216                Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1217            })
1218            .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1219
1220        let partition_mode = protobuf::PartitionMode::try_from(hashjoin.partition_mode)
1221            .map_err(|_| {
1222            proto_error(format!(
1223                "Received a HashJoinNode message with unknown PartitionMode {}",
1224                hashjoin.partition_mode
1225            ))
1226        })?;
1227        let partition_mode = match partition_mode {
1228            protobuf::PartitionMode::CollectLeft => PartitionMode::CollectLeft,
1229            protobuf::PartitionMode::Partitioned => PartitionMode::Partitioned,
1230            protobuf::PartitionMode::Auto => PartitionMode::Auto,
1231        };
1232        let projection = if !hashjoin.projection.is_empty() {
1233            Some(
1234                hashjoin
1235                    .projection
1236                    .iter()
1237                    .map(|i| *i as usize)
1238                    .collect::<Vec<_>>(),
1239            )
1240        } else {
1241            None
1242        };
1243        Ok(Arc::new(HashJoinExec::try_new(
1244            left,
1245            right,
1246            on,
1247            filter,
1248            &join_type.into(),
1249            projection,
1250            partition_mode,
1251            null_equality.into(),
1252        )?))
1253    }
1254
1255    fn try_into_symmetric_hash_join_physical_plan(
1256        &self,
1257        sym_join: &protobuf::SymmetricHashJoinExecNode,
1258        registry: &dyn FunctionRegistry,
1259        runtime: &RuntimeEnv,
1260        extension_codec: &dyn PhysicalExtensionCodec,
1261    ) -> Result<Arc<dyn ExecutionPlan>> {
1262        let left =
1263            into_physical_plan(&sym_join.left, registry, runtime, extension_codec)?;
1264        let right =
1265            into_physical_plan(&sym_join.right, registry, runtime, extension_codec)?;
1266        let left_schema = left.schema();
1267        let right_schema = right.schema();
1268        let on = sym_join
1269            .on
1270            .iter()
1271            .map(|col| {
1272                let left = parse_physical_expr(
1273                    &col.left.clone().unwrap(),
1274                    registry,
1275                    left_schema.as_ref(),
1276                    extension_codec,
1277                )?;
1278                let right = parse_physical_expr(
1279                    &col.right.clone().unwrap(),
1280                    registry,
1281                    right_schema.as_ref(),
1282                    extension_codec,
1283                )?;
1284                Ok((left, right))
1285            })
1286            .collect::<Result<_>>()?;
1287        let join_type =
1288            protobuf::JoinType::try_from(sym_join.join_type).map_err(|_| {
1289                proto_error(format!(
1290                    "Received a SymmetricHashJoin message with unknown JoinType {}",
1291                    sym_join.join_type
1292                ))
1293            })?;
1294        let null_equality = protobuf::NullEquality::try_from(sym_join.null_equality)
1295            .map_err(|_| {
1296                proto_error(format!(
1297                    "Received a SymmetricHashJoin message with unknown NullEquality {}",
1298                    sym_join.null_equality
1299                ))
1300            })?;
1301        let filter = sym_join
1302            .filter
1303            .as_ref()
1304            .map(|f| {
1305                let schema = f
1306                    .schema
1307                    .as_ref()
1308                    .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1309                    .try_into()?;
1310
1311                let expression = parse_physical_expr(
1312                    f.expression.as_ref().ok_or_else(|| {
1313                        proto_error("Unexpected empty filter expression")
1314                    })?,
1315                    registry, &schema,
1316                    extension_codec,
1317                )?;
1318                let column_indices = f.column_indices
1319                    .iter()
1320                    .map(|i| {
1321                        let side = protobuf::JoinSide::try_from(i.side)
1322                            .map_err(|_| proto_error(format!(
1323                                "Received a HashJoinNode message with JoinSide in Filter {}",
1324                                i.side))
1325                            )?;
1326
1327                        Ok(ColumnIndex {
1328                            index: i.index as usize,
1329                            side: side.into(),
1330                        })
1331                    })
1332                    .collect::<Result<_>>()?;
1333
1334                Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1335            })
1336            .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1337
1338        let left_sort_exprs = parse_physical_sort_exprs(
1339            &sym_join.left_sort_exprs,
1340            registry,
1341            &left_schema,
1342            extension_codec,
1343        )?;
1344        let left_sort_exprs = LexOrdering::new(left_sort_exprs);
1345
1346        let right_sort_exprs = parse_physical_sort_exprs(
1347            &sym_join.right_sort_exprs,
1348            registry,
1349            &right_schema,
1350            extension_codec,
1351        )?;
1352        let right_sort_exprs = LexOrdering::new(right_sort_exprs);
1353
1354        let partition_mode = protobuf::StreamPartitionMode::try_from(
1355            sym_join.partition_mode,
1356        )
1357        .map_err(|_| {
1358            proto_error(format!(
1359                "Received a SymmetricHashJoin message with unknown PartitionMode {}",
1360                sym_join.partition_mode
1361            ))
1362        })?;
1363        let partition_mode = match partition_mode {
1364            protobuf::StreamPartitionMode::SinglePartition => {
1365                StreamJoinPartitionMode::SinglePartition
1366            }
1367            protobuf::StreamPartitionMode::PartitionedExec => {
1368                StreamJoinPartitionMode::Partitioned
1369            }
1370        };
1371        SymmetricHashJoinExec::try_new(
1372            left,
1373            right,
1374            on,
1375            filter,
1376            &join_type.into(),
1377            null_equality.into(),
1378            left_sort_exprs,
1379            right_sort_exprs,
1380            partition_mode,
1381        )
1382        .map(|e| Arc::new(e) as _)
1383    }
1384
1385    fn try_into_union_physical_plan(
1386        &self,
1387        union: &protobuf::UnionExecNode,
1388        registry: &dyn FunctionRegistry,
1389        runtime: &RuntimeEnv,
1390        extension_codec: &dyn PhysicalExtensionCodec,
1391    ) -> Result<Arc<dyn ExecutionPlan>> {
1392        let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1393        for input in &union.inputs {
1394            inputs.push(input.try_into_physical_plan(
1395                registry,
1396                runtime,
1397                extension_codec,
1398            )?);
1399        }
1400        Ok(Arc::new(UnionExec::new(inputs)))
1401    }
1402
1403    fn try_into_interleave_physical_plan(
1404        &self,
1405        interleave: &protobuf::InterleaveExecNode,
1406        registry: &dyn FunctionRegistry,
1407        runtime: &RuntimeEnv,
1408        extension_codec: &dyn PhysicalExtensionCodec,
1409    ) -> Result<Arc<dyn ExecutionPlan>> {
1410        let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1411        for input in &interleave.inputs {
1412            inputs.push(input.try_into_physical_plan(
1413                registry,
1414                runtime,
1415                extension_codec,
1416            )?);
1417        }
1418        Ok(Arc::new(InterleaveExec::try_new(inputs)?))
1419    }
1420
1421    fn try_into_cross_join_physical_plan(
1422        &self,
1423        crossjoin: &protobuf::CrossJoinExecNode,
1424        registry: &dyn FunctionRegistry,
1425        runtime: &RuntimeEnv,
1426        extension_codec: &dyn PhysicalExtensionCodec,
1427    ) -> Result<Arc<dyn ExecutionPlan>> {
1428        let left: Arc<dyn ExecutionPlan> =
1429            into_physical_plan(&crossjoin.left, registry, runtime, extension_codec)?;
1430        let right: Arc<dyn ExecutionPlan> =
1431            into_physical_plan(&crossjoin.right, registry, runtime, extension_codec)?;
1432        Ok(Arc::new(CrossJoinExec::new(left, right)))
1433    }
1434
1435    fn try_into_empty_physical_plan(
1436        &self,
1437        empty: &protobuf::EmptyExecNode,
1438        _registry: &dyn FunctionRegistry,
1439        _runtime: &RuntimeEnv,
1440        _extension_codec: &dyn PhysicalExtensionCodec,
1441    ) -> Result<Arc<dyn ExecutionPlan>> {
1442        let schema = Arc::new(convert_required!(empty.schema)?);
1443        Ok(Arc::new(EmptyExec::new(schema)))
1444    }
1445
1446    fn try_into_placeholder_row_physical_plan(
1447        &self,
1448        placeholder: &protobuf::PlaceholderRowExecNode,
1449        _registry: &dyn FunctionRegistry,
1450        _runtime: &RuntimeEnv,
1451        _extension_codec: &dyn PhysicalExtensionCodec,
1452    ) -> Result<Arc<dyn ExecutionPlan>> {
1453        let schema = Arc::new(convert_required!(placeholder.schema)?);
1454        Ok(Arc::new(PlaceholderRowExec::new(schema)))
1455    }
1456
1457    fn try_into_sort_physical_plan(
1458        &self,
1459        sort: &protobuf::SortExecNode,
1460        registry: &dyn FunctionRegistry,
1461        runtime: &RuntimeEnv,
1462        extension_codec: &dyn PhysicalExtensionCodec,
1463    ) -> Result<Arc<dyn ExecutionPlan>> {
1464        let input = into_physical_plan(&sort.input, registry, runtime, extension_codec)?;
1465        let exprs = sort
1466            .expr
1467            .iter()
1468            .map(|expr| {
1469                let expr = expr.expr_type.as_ref().ok_or_else(|| {
1470                    proto_error(format!(
1471                        "physical_plan::from_proto() Unexpected expr {self:?}"
1472                    ))
1473                })?;
1474                if let ExprType::Sort(sort_expr) = expr {
1475                    let expr = sort_expr
1476                        .expr
1477                        .as_ref()
1478                        .ok_or_else(|| {
1479                            proto_error(format!(
1480                                "physical_plan::from_proto() Unexpected sort expr {self:?}"
1481                            ))
1482                        })?
1483                        .as_ref();
1484                    Ok(PhysicalSortExpr {
1485                        expr: parse_physical_expr(expr, registry, input.schema().as_ref(), extension_codec)?,
1486                        options: SortOptions {
1487                            descending: !sort_expr.asc,
1488                            nulls_first: sort_expr.nulls_first,
1489                        },
1490                    })
1491                } else {
1492                    internal_err!(
1493                        "physical_plan::from_proto() {self:?}"
1494                    )
1495                }
1496            })
1497            .collect::<Result<Vec<_>>>()?;
1498        let Some(ordering) = LexOrdering::new(exprs) else {
1499            return internal_err!("SortExec requires an ordering");
1500        };
1501        let fetch = (sort.fetch >= 0).then_some(sort.fetch as _);
1502        let new_sort = SortExec::new(ordering, input)
1503            .with_fetch(fetch)
1504            .with_preserve_partitioning(sort.preserve_partitioning);
1505
1506        Ok(Arc::new(new_sort))
1507    }
1508
1509    fn try_into_sort_preserving_merge_physical_plan(
1510        &self,
1511        sort: &protobuf::SortPreservingMergeExecNode,
1512        registry: &dyn FunctionRegistry,
1513        runtime: &RuntimeEnv,
1514        extension_codec: &dyn PhysicalExtensionCodec,
1515    ) -> Result<Arc<dyn ExecutionPlan>> {
1516        let input = into_physical_plan(&sort.input, registry, runtime, extension_codec)?;
1517        let exprs = sort
1518            .expr
1519            .iter()
1520            .map(|expr| {
1521                let expr = expr.expr_type.as_ref().ok_or_else(|| {
1522                    proto_error(format!(
1523                        "physical_plan::from_proto() Unexpected expr {self:?}"
1524                    ))
1525                })?;
1526                if let ExprType::Sort(sort_expr) = expr {
1527                    let expr = sort_expr
1528                        .expr
1529                        .as_ref()
1530                        .ok_or_else(|| {
1531                            proto_error(format!(
1532                            "physical_plan::from_proto() Unexpected sort expr {self:?}"
1533                        ))
1534                        })?
1535                        .as_ref();
1536                    Ok(PhysicalSortExpr {
1537                        expr: parse_physical_expr(
1538                            expr,
1539                            registry,
1540                            input.schema().as_ref(),
1541                            extension_codec,
1542                        )?,
1543                        options: SortOptions {
1544                            descending: !sort_expr.asc,
1545                            nulls_first: sort_expr.nulls_first,
1546                        },
1547                    })
1548                } else {
1549                    internal_err!("physical_plan::from_proto() {self:?}")
1550                }
1551            })
1552            .collect::<Result<Vec<_>>>()?;
1553        let Some(ordering) = LexOrdering::new(exprs) else {
1554            return internal_err!("SortExec requires an ordering");
1555        };
1556        let fetch = (sort.fetch >= 0).then_some(sort.fetch as _);
1557        Ok(Arc::new(
1558            SortPreservingMergeExec::new(ordering, input).with_fetch(fetch),
1559        ))
1560    }
1561
1562    fn try_into_extension_physical_plan(
1563        &self,
1564        extension: &protobuf::PhysicalExtensionNode,
1565        registry: &dyn FunctionRegistry,
1566        runtime: &RuntimeEnv,
1567        extension_codec: &dyn PhysicalExtensionCodec,
1568    ) -> Result<Arc<dyn ExecutionPlan>> {
1569        let inputs: Vec<Arc<dyn ExecutionPlan>> = extension
1570            .inputs
1571            .iter()
1572            .map(|i| i.try_into_physical_plan(registry, runtime, extension_codec))
1573            .collect::<Result<_>>()?;
1574
1575        let extension_node =
1576            extension_codec.try_decode(extension.node.as_slice(), &inputs, registry)?;
1577
1578        Ok(extension_node)
1579    }
1580
1581    fn try_into_nested_loop_join_physical_plan(
1582        &self,
1583        join: &protobuf::NestedLoopJoinExecNode,
1584        registry: &dyn FunctionRegistry,
1585        runtime: &RuntimeEnv,
1586        extension_codec: &dyn PhysicalExtensionCodec,
1587    ) -> Result<Arc<dyn ExecutionPlan>> {
1588        let left: Arc<dyn ExecutionPlan> =
1589            into_physical_plan(&join.left, registry, runtime, extension_codec)?;
1590        let right: Arc<dyn ExecutionPlan> =
1591            into_physical_plan(&join.right, registry, runtime, extension_codec)?;
1592        let join_type = protobuf::JoinType::try_from(join.join_type).map_err(|_| {
1593            proto_error(format!(
1594                "Received a NestedLoopJoinExecNode message with unknown JoinType {}",
1595                join.join_type
1596            ))
1597        })?;
1598        let filter = join
1599                    .filter
1600                    .as_ref()
1601                    .map(|f| {
1602                        let schema = f
1603                            .schema
1604                            .as_ref()
1605                            .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1606                            .try_into()?;
1607
1608                        let expression = parse_physical_expr(
1609                            f.expression.as_ref().ok_or_else(|| {
1610                                proto_error("Unexpected empty filter expression")
1611                            })?,
1612                            registry, &schema,
1613                            extension_codec,
1614                        )?;
1615                        let column_indices = f.column_indices
1616                            .iter()
1617                            .map(|i| {
1618                                let side = protobuf::JoinSide::try_from(i.side)
1619                                    .map_err(|_| proto_error(format!(
1620                                        "Received a NestedLoopJoinExecNode message with JoinSide in Filter {}",
1621                                        i.side))
1622                                    )?;
1623
1624                                Ok(ColumnIndex {
1625                                    index: i.index as usize,
1626                                    side: side.into(),
1627                                })
1628                            })
1629                            .collect::<Result<Vec<_>>>()?;
1630
1631                        Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1632                    })
1633                    .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1634
1635        let projection = if !join.projection.is_empty() {
1636            Some(
1637                join.projection
1638                    .iter()
1639                    .map(|i| *i as usize)
1640                    .collect::<Vec<_>>(),
1641            )
1642        } else {
1643            None
1644        };
1645
1646        Ok(Arc::new(NestedLoopJoinExec::try_new(
1647            left,
1648            right,
1649            filter,
1650            &join_type.into(),
1651            projection,
1652        )?))
1653    }
1654
1655    fn try_into_analyze_physical_plan(
1656        &self,
1657        analyze: &protobuf::AnalyzeExecNode,
1658        registry: &dyn FunctionRegistry,
1659        runtime: &RuntimeEnv,
1660        extension_codec: &dyn PhysicalExtensionCodec,
1661    ) -> Result<Arc<dyn ExecutionPlan>> {
1662        let input: Arc<dyn ExecutionPlan> =
1663            into_physical_plan(&analyze.input, registry, runtime, extension_codec)?;
1664        Ok(Arc::new(AnalyzeExec::new(
1665            analyze.verbose,
1666            analyze.show_statistics,
1667            input,
1668            Arc::new(convert_required!(analyze.schema)?),
1669        )))
1670    }
1671
1672    fn try_into_json_sink_physical_plan(
1673        &self,
1674        sink: &protobuf::JsonSinkExecNode,
1675        registry: &dyn FunctionRegistry,
1676        runtime: &RuntimeEnv,
1677        extension_codec: &dyn PhysicalExtensionCodec,
1678    ) -> Result<Arc<dyn ExecutionPlan>> {
1679        let input = into_physical_plan(&sink.input, registry, runtime, extension_codec)?;
1680
1681        let data_sink: JsonSink = sink
1682            .sink
1683            .as_ref()
1684            .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1685            .try_into()?;
1686        let sink_schema = input.schema();
1687        let sort_order = sink
1688            .sort_order
1689            .as_ref()
1690            .map(|collection| {
1691                parse_physical_sort_exprs(
1692                    &collection.physical_sort_expr_nodes,
1693                    registry,
1694                    &sink_schema,
1695                    extension_codec,
1696                )
1697                .map(|sort_exprs| {
1698                    LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1699                })
1700            })
1701            .transpose()?
1702            .flatten();
1703        Ok(Arc::new(DataSinkExec::new(
1704            input,
1705            Arc::new(data_sink),
1706            sort_order,
1707        )))
1708    }
1709
1710    fn try_into_csv_sink_physical_plan(
1711        &self,
1712        sink: &protobuf::CsvSinkExecNode,
1713        registry: &dyn FunctionRegistry,
1714        runtime: &RuntimeEnv,
1715        extension_codec: &dyn PhysicalExtensionCodec,
1716    ) -> Result<Arc<dyn ExecutionPlan>> {
1717        let input = into_physical_plan(&sink.input, registry, runtime, extension_codec)?;
1718
1719        let data_sink: CsvSink = sink
1720            .sink
1721            .as_ref()
1722            .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1723            .try_into()?;
1724        let sink_schema = input.schema();
1725        let sort_order = sink
1726            .sort_order
1727            .as_ref()
1728            .map(|collection| {
1729                parse_physical_sort_exprs(
1730                    &collection.physical_sort_expr_nodes,
1731                    registry,
1732                    &sink_schema,
1733                    extension_codec,
1734                )
1735                .map(|sort_exprs| {
1736                    LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1737                })
1738            })
1739            .transpose()?
1740            .flatten();
1741        Ok(Arc::new(DataSinkExec::new(
1742            input,
1743            Arc::new(data_sink),
1744            sort_order,
1745        )))
1746    }
1747
1748    fn try_into_parquet_sink_physical_plan(
1749        &self,
1750        sink: &protobuf::ParquetSinkExecNode,
1751        registry: &dyn FunctionRegistry,
1752        runtime: &RuntimeEnv,
1753        extension_codec: &dyn PhysicalExtensionCodec,
1754    ) -> Result<Arc<dyn ExecutionPlan>> {
1755        #[cfg(feature = "parquet")]
1756        {
1757            let input =
1758                into_physical_plan(&sink.input, registry, runtime, extension_codec)?;
1759
1760            let data_sink: ParquetSink = sink
1761                .sink
1762                .as_ref()
1763                .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1764                .try_into()?;
1765            let sink_schema = input.schema();
1766            let sort_order = sink
1767                .sort_order
1768                .as_ref()
1769                .map(|collection| {
1770                    parse_physical_sort_exprs(
1771                        &collection.physical_sort_expr_nodes,
1772                        registry,
1773                        &sink_schema,
1774                        extension_codec,
1775                    )
1776                    .map(|sort_exprs| {
1777                        LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1778                    })
1779                })
1780                .transpose()?
1781                .flatten();
1782            Ok(Arc::new(DataSinkExec::new(
1783                input,
1784                Arc::new(data_sink),
1785                sort_order,
1786            )))
1787        }
1788        #[cfg(not(feature = "parquet"))]
1789        panic!("Trying to use ParquetSink without `parquet` feature enabled");
1790    }
1791
1792    fn try_into_unnest_physical_plan(
1793        &self,
1794        unnest: &protobuf::UnnestExecNode,
1795        registry: &dyn FunctionRegistry,
1796        runtime: &RuntimeEnv,
1797        extension_codec: &dyn PhysicalExtensionCodec,
1798    ) -> Result<Arc<dyn ExecutionPlan>> {
1799        let input =
1800            into_physical_plan(&unnest.input, registry, runtime, extension_codec)?;
1801
1802        Ok(Arc::new(UnnestExec::new(
1803            input,
1804            unnest
1805                .list_type_columns
1806                .iter()
1807                .map(|c| ListUnnest {
1808                    index_in_input_schema: c.index_in_input_schema as _,
1809                    depth: c.depth as _,
1810                })
1811                .collect(),
1812            unnest.struct_type_columns.iter().map(|c| *c as _).collect(),
1813            Arc::new(convert_required!(unnest.schema)?),
1814            into_required!(unnest.options)?,
1815        )))
1816    }
1817
1818    fn try_into_cooperative_physical_plan(
1819        &self,
1820        field_stream: &protobuf::CooperativeExecNode,
1821        registry: &dyn FunctionRegistry,
1822        runtime: &RuntimeEnv,
1823        extension_codec: &dyn PhysicalExtensionCodec,
1824    ) -> Result<Arc<dyn ExecutionPlan>> {
1825        let input =
1826            into_physical_plan(&field_stream.input, registry, runtime, extension_codec)?;
1827        Ok(Arc::new(CooperativeExec::new(input)))
1828    }
1829
1830    fn try_from_explain_exec(
1831        exec: &ExplainExec,
1832        _extension_codec: &dyn PhysicalExtensionCodec,
1833    ) -> Result<Self> {
1834        Ok(protobuf::PhysicalPlanNode {
1835            physical_plan_type: Some(PhysicalPlanType::Explain(
1836                protobuf::ExplainExecNode {
1837                    schema: Some(exec.schema().as_ref().try_into()?),
1838                    stringified_plans: exec
1839                        .stringified_plans()
1840                        .iter()
1841                        .map(|plan| plan.into())
1842                        .collect(),
1843                    verbose: exec.verbose(),
1844                },
1845            )),
1846        })
1847    }
1848
1849    fn try_from_projection_exec(
1850        exec: &ProjectionExec,
1851        extension_codec: &dyn PhysicalExtensionCodec,
1852    ) -> Result<Self> {
1853        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1854            exec.input().to_owned(),
1855            extension_codec,
1856        )?;
1857        let expr = exec
1858            .expr()
1859            .iter()
1860            .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
1861            .collect::<Result<Vec<_>>>()?;
1862        let expr_name = exec.expr().iter().map(|expr| expr.1.clone()).collect();
1863        Ok(protobuf::PhysicalPlanNode {
1864            physical_plan_type: Some(PhysicalPlanType::Projection(Box::new(
1865                protobuf::ProjectionExecNode {
1866                    input: Some(Box::new(input)),
1867                    expr,
1868                    expr_name,
1869                },
1870            ))),
1871        })
1872    }
1873
1874    fn try_from_analyze_exec(
1875        exec: &AnalyzeExec,
1876        extension_codec: &dyn PhysicalExtensionCodec,
1877    ) -> Result<Self> {
1878        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1879            exec.input().to_owned(),
1880            extension_codec,
1881        )?;
1882        Ok(protobuf::PhysicalPlanNode {
1883            physical_plan_type: Some(PhysicalPlanType::Analyze(Box::new(
1884                protobuf::AnalyzeExecNode {
1885                    verbose: exec.verbose(),
1886                    show_statistics: exec.show_statistics(),
1887                    input: Some(Box::new(input)),
1888                    schema: Some(exec.schema().as_ref().try_into()?),
1889                },
1890            ))),
1891        })
1892    }
1893
1894    fn try_from_filter_exec(
1895        exec: &FilterExec,
1896        extension_codec: &dyn PhysicalExtensionCodec,
1897    ) -> Result<Self> {
1898        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1899            exec.input().to_owned(),
1900            extension_codec,
1901        )?;
1902        Ok(protobuf::PhysicalPlanNode {
1903            physical_plan_type: Some(PhysicalPlanType::Filter(Box::new(
1904                protobuf::FilterExecNode {
1905                    input: Some(Box::new(input)),
1906                    expr: Some(serialize_physical_expr(
1907                        exec.predicate(),
1908                        extension_codec,
1909                    )?),
1910                    default_filter_selectivity: exec.default_selectivity() as u32,
1911                    projection: exec.projection().as_ref().map_or_else(Vec::new, |v| {
1912                        v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
1913                    }),
1914                },
1915            ))),
1916        })
1917    }
1918
1919    fn try_from_global_limit_exec(
1920        limit: &GlobalLimitExec,
1921        extension_codec: &dyn PhysicalExtensionCodec,
1922    ) -> Result<Self> {
1923        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1924            limit.input().to_owned(),
1925            extension_codec,
1926        )?;
1927
1928        Ok(protobuf::PhysicalPlanNode {
1929            physical_plan_type: Some(PhysicalPlanType::GlobalLimit(Box::new(
1930                protobuf::GlobalLimitExecNode {
1931                    input: Some(Box::new(input)),
1932                    skip: limit.skip() as u32,
1933                    fetch: match limit.fetch() {
1934                        Some(n) => n as i64,
1935                        _ => -1, // no limit
1936                    },
1937                },
1938            ))),
1939        })
1940    }
1941
1942    fn try_from_local_limit_exec(
1943        limit: &LocalLimitExec,
1944        extension_codec: &dyn PhysicalExtensionCodec,
1945    ) -> Result<Self> {
1946        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1947            limit.input().to_owned(),
1948            extension_codec,
1949        )?;
1950        Ok(protobuf::PhysicalPlanNode {
1951            physical_plan_type: Some(PhysicalPlanType::LocalLimit(Box::new(
1952                protobuf::LocalLimitExecNode {
1953                    input: Some(Box::new(input)),
1954                    fetch: limit.fetch() as u32,
1955                },
1956            ))),
1957        })
1958    }
1959
1960    fn try_from_hash_join_exec(
1961        exec: &HashJoinExec,
1962        extension_codec: &dyn PhysicalExtensionCodec,
1963    ) -> Result<Self> {
1964        let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
1965            exec.left().to_owned(),
1966            extension_codec,
1967        )?;
1968        let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
1969            exec.right().to_owned(),
1970            extension_codec,
1971        )?;
1972        let on: Vec<protobuf::JoinOn> = exec
1973            .on()
1974            .iter()
1975            .map(|tuple| {
1976                let l = serialize_physical_expr(&tuple.0, extension_codec)?;
1977                let r = serialize_physical_expr(&tuple.1, extension_codec)?;
1978                Ok::<_, DataFusionError>(protobuf::JoinOn {
1979                    left: Some(l),
1980                    right: Some(r),
1981                })
1982            })
1983            .collect::<Result<_>>()?;
1984        let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
1985        let null_equality: protobuf::NullEquality = exec.null_equality().into();
1986        let filter = exec
1987            .filter()
1988            .as_ref()
1989            .map(|f| {
1990                let expression =
1991                    serialize_physical_expr(f.expression(), extension_codec)?;
1992                let column_indices = f
1993                    .column_indices()
1994                    .iter()
1995                    .map(|i| {
1996                        let side: protobuf::JoinSide = i.side.to_owned().into();
1997                        protobuf::ColumnIndex {
1998                            index: i.index as u32,
1999                            side: side.into(),
2000                        }
2001                    })
2002                    .collect();
2003                let schema = f.schema().as_ref().try_into()?;
2004                Ok(protobuf::JoinFilter {
2005                    expression: Some(expression),
2006                    column_indices,
2007                    schema: Some(schema),
2008                })
2009            })
2010            .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2011
2012        let partition_mode = match exec.partition_mode() {
2013            PartitionMode::CollectLeft => protobuf::PartitionMode::CollectLeft,
2014            PartitionMode::Partitioned => protobuf::PartitionMode::Partitioned,
2015            PartitionMode::Auto => protobuf::PartitionMode::Auto,
2016        };
2017
2018        Ok(protobuf::PhysicalPlanNode {
2019            physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new(
2020                protobuf::HashJoinExecNode {
2021                    left: Some(Box::new(left)),
2022                    right: Some(Box::new(right)),
2023                    on,
2024                    join_type: join_type.into(),
2025                    partition_mode: partition_mode.into(),
2026                    null_equality: null_equality.into(),
2027                    filter,
2028                    projection: exec.projection.as_ref().map_or_else(Vec::new, |v| {
2029                        v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2030                    }),
2031                },
2032            ))),
2033        })
2034    }
2035
2036    fn try_from_symmetric_hash_join_exec(
2037        exec: &SymmetricHashJoinExec,
2038        extension_codec: &dyn PhysicalExtensionCodec,
2039    ) -> Result<Self> {
2040        let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2041            exec.left().to_owned(),
2042            extension_codec,
2043        )?;
2044        let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2045            exec.right().to_owned(),
2046            extension_codec,
2047        )?;
2048        let on = exec
2049            .on()
2050            .iter()
2051            .map(|tuple| {
2052                let l = serialize_physical_expr(&tuple.0, extension_codec)?;
2053                let r = serialize_physical_expr(&tuple.1, extension_codec)?;
2054                Ok::<_, DataFusionError>(protobuf::JoinOn {
2055                    left: Some(l),
2056                    right: Some(r),
2057                })
2058            })
2059            .collect::<Result<_>>()?;
2060        let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2061        let null_equality: protobuf::NullEquality = exec.null_equality().into();
2062        let filter = exec
2063            .filter()
2064            .as_ref()
2065            .map(|f| {
2066                let expression =
2067                    serialize_physical_expr(f.expression(), extension_codec)?;
2068                let column_indices = f
2069                    .column_indices()
2070                    .iter()
2071                    .map(|i| {
2072                        let side: protobuf::JoinSide = i.side.to_owned().into();
2073                        protobuf::ColumnIndex {
2074                            index: i.index as u32,
2075                            side: side.into(),
2076                        }
2077                    })
2078                    .collect();
2079                let schema = f.schema().as_ref().try_into()?;
2080                Ok(protobuf::JoinFilter {
2081                    expression: Some(expression),
2082                    column_indices,
2083                    schema: Some(schema),
2084                })
2085            })
2086            .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2087
2088        let partition_mode = match exec.partition_mode() {
2089            StreamJoinPartitionMode::SinglePartition => {
2090                protobuf::StreamPartitionMode::SinglePartition
2091            }
2092            StreamJoinPartitionMode::Partitioned => {
2093                protobuf::StreamPartitionMode::PartitionedExec
2094            }
2095        };
2096
2097        let left_sort_exprs = exec
2098            .left_sort_exprs()
2099            .map(|exprs| {
2100                exprs
2101                    .iter()
2102                    .map(|expr| {
2103                        Ok(protobuf::PhysicalSortExprNode {
2104                            expr: Some(Box::new(serialize_physical_expr(
2105                                &expr.expr,
2106                                extension_codec,
2107                            )?)),
2108                            asc: !expr.options.descending,
2109                            nulls_first: expr.options.nulls_first,
2110                        })
2111                    })
2112                    .collect::<Result<Vec<_>>>()
2113            })
2114            .transpose()?
2115            .unwrap_or(vec![]);
2116
2117        let right_sort_exprs = exec
2118            .right_sort_exprs()
2119            .map(|exprs| {
2120                exprs
2121                    .iter()
2122                    .map(|expr| {
2123                        Ok(protobuf::PhysicalSortExprNode {
2124                            expr: Some(Box::new(serialize_physical_expr(
2125                                &expr.expr,
2126                                extension_codec,
2127                            )?)),
2128                            asc: !expr.options.descending,
2129                            nulls_first: expr.options.nulls_first,
2130                        })
2131                    })
2132                    .collect::<Result<Vec<_>>>()
2133            })
2134            .transpose()?
2135            .unwrap_or(vec![]);
2136
2137        Ok(protobuf::PhysicalPlanNode {
2138            physical_plan_type: Some(PhysicalPlanType::SymmetricHashJoin(Box::new(
2139                protobuf::SymmetricHashJoinExecNode {
2140                    left: Some(Box::new(left)),
2141                    right: Some(Box::new(right)),
2142                    on,
2143                    join_type: join_type.into(),
2144                    partition_mode: partition_mode.into(),
2145                    null_equality: null_equality.into(),
2146                    left_sort_exprs,
2147                    right_sort_exprs,
2148                    filter,
2149                },
2150            ))),
2151        })
2152    }
2153
2154    fn try_from_cross_join_exec(
2155        exec: &CrossJoinExec,
2156        extension_codec: &dyn PhysicalExtensionCodec,
2157    ) -> Result<Self> {
2158        let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2159            exec.left().to_owned(),
2160            extension_codec,
2161        )?;
2162        let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2163            exec.right().to_owned(),
2164            extension_codec,
2165        )?;
2166        Ok(protobuf::PhysicalPlanNode {
2167            physical_plan_type: Some(PhysicalPlanType::CrossJoin(Box::new(
2168                protobuf::CrossJoinExecNode {
2169                    left: Some(Box::new(left)),
2170                    right: Some(Box::new(right)),
2171                },
2172            ))),
2173        })
2174    }
2175
2176    fn try_from_aggregate_exec(
2177        exec: &AggregateExec,
2178        extension_codec: &dyn PhysicalExtensionCodec,
2179    ) -> Result<Self> {
2180        let groups: Vec<bool> = exec
2181            .group_expr()
2182            .groups()
2183            .iter()
2184            .flatten()
2185            .copied()
2186            .collect();
2187
2188        let group_names = exec
2189            .group_expr()
2190            .expr()
2191            .iter()
2192            .map(|expr| expr.1.to_owned())
2193            .collect();
2194
2195        let filter = exec
2196            .filter_expr()
2197            .iter()
2198            .map(|expr| serialize_maybe_filter(expr.to_owned(), extension_codec))
2199            .collect::<Result<Vec<_>>>()?;
2200
2201        let agg = exec
2202            .aggr_expr()
2203            .iter()
2204            .map(|expr| serialize_physical_aggr_expr(expr.to_owned(), extension_codec))
2205            .collect::<Result<Vec<_>>>()?;
2206
2207        let agg_names = exec
2208            .aggr_expr()
2209            .iter()
2210            .map(|expr| expr.name().to_string())
2211            .collect::<Vec<_>>();
2212
2213        let agg_mode = match exec.mode() {
2214            AggregateMode::Partial => protobuf::AggregateMode::Partial,
2215            AggregateMode::Final => protobuf::AggregateMode::Final,
2216            AggregateMode::FinalPartitioned => protobuf::AggregateMode::FinalPartitioned,
2217            AggregateMode::Single => protobuf::AggregateMode::Single,
2218            AggregateMode::SinglePartitioned => {
2219                protobuf::AggregateMode::SinglePartitioned
2220            }
2221        };
2222        let input_schema = exec.input_schema();
2223        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2224            exec.input().to_owned(),
2225            extension_codec,
2226        )?;
2227
2228        let null_expr = exec
2229            .group_expr()
2230            .null_expr()
2231            .iter()
2232            .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
2233            .collect::<Result<Vec<_>>>()?;
2234
2235        let group_expr = exec
2236            .group_expr()
2237            .expr()
2238            .iter()
2239            .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
2240            .collect::<Result<Vec<_>>>()?;
2241
2242        let limit = exec.limit().map(|value| protobuf::AggLimit {
2243            limit: value as u64,
2244        });
2245
2246        Ok(protobuf::PhysicalPlanNode {
2247            physical_plan_type: Some(PhysicalPlanType::Aggregate(Box::new(
2248                protobuf::AggregateExecNode {
2249                    group_expr,
2250                    group_expr_name: group_names,
2251                    aggr_expr: agg,
2252                    filter_expr: filter,
2253                    aggr_expr_name: agg_names,
2254                    mode: agg_mode as i32,
2255                    input: Some(Box::new(input)),
2256                    input_schema: Some(input_schema.as_ref().try_into()?),
2257                    null_expr,
2258                    groups,
2259                    limit,
2260                },
2261            ))),
2262        })
2263    }
2264
2265    fn try_from_empty_exec(
2266        empty: &EmptyExec,
2267        _extension_codec: &dyn PhysicalExtensionCodec,
2268    ) -> Result<Self> {
2269        let schema = empty.schema().as_ref().try_into()?;
2270        Ok(protobuf::PhysicalPlanNode {
2271            physical_plan_type: Some(PhysicalPlanType::Empty(protobuf::EmptyExecNode {
2272                schema: Some(schema),
2273            })),
2274        })
2275    }
2276
2277    fn try_from_placeholder_row_exec(
2278        empty: &PlaceholderRowExec,
2279        _extension_codec: &dyn PhysicalExtensionCodec,
2280    ) -> Result<Self> {
2281        let schema = empty.schema().as_ref().try_into()?;
2282        Ok(protobuf::PhysicalPlanNode {
2283            physical_plan_type: Some(PhysicalPlanType::PlaceholderRow(
2284                protobuf::PlaceholderRowExecNode {
2285                    schema: Some(schema),
2286                },
2287            )),
2288        })
2289    }
2290
2291    fn try_from_coalesce_batches_exec(
2292        coalesce_batches: &CoalesceBatchesExec,
2293        extension_codec: &dyn PhysicalExtensionCodec,
2294    ) -> Result<Self> {
2295        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2296            coalesce_batches.input().to_owned(),
2297            extension_codec,
2298        )?;
2299        Ok(protobuf::PhysicalPlanNode {
2300            physical_plan_type: Some(PhysicalPlanType::CoalesceBatches(Box::new(
2301                protobuf::CoalesceBatchesExecNode {
2302                    input: Some(Box::new(input)),
2303                    target_batch_size: coalesce_batches.target_batch_size() as u32,
2304                    fetch: coalesce_batches.fetch().map(|n| n as u32),
2305                },
2306            ))),
2307        })
2308    }
2309
2310    fn try_from_data_source_exec(
2311        data_source_exec: &DataSourceExec,
2312        extension_codec: &dyn PhysicalExtensionCodec,
2313    ) -> Result<Option<Self>> {
2314        let data_source = data_source_exec.data_source();
2315        if let Some(maybe_csv) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2316            let source = maybe_csv.file_source();
2317            if let Some(csv_config) = source.as_any().downcast_ref::<CsvSource>() {
2318                return Ok(Some(protobuf::PhysicalPlanNode {
2319                    physical_plan_type: Some(PhysicalPlanType::CsvScan(
2320                        protobuf::CsvScanExecNode {
2321                            base_conf: Some(serialize_file_scan_config(
2322                                maybe_csv,
2323                                extension_codec,
2324                            )?),
2325                            has_header: csv_config.has_header(),
2326                            delimiter: byte_to_string(
2327                                csv_config.delimiter(),
2328                                "delimiter",
2329                            )?,
2330                            quote: byte_to_string(csv_config.quote(), "quote")?,
2331                            optional_escape: if let Some(escape) = csv_config.escape() {
2332                                Some(
2333                                    protobuf::csv_scan_exec_node::OptionalEscape::Escape(
2334                                        byte_to_string(escape, "escape")?,
2335                                    ),
2336                                )
2337                            } else {
2338                                None
2339                            },
2340                            optional_comment: if let Some(comment) = csv_config.comment()
2341                            {
2342                                Some(protobuf::csv_scan_exec_node::OptionalComment::Comment(
2343                                        byte_to_string(comment, "comment")?,
2344                                    ))
2345                            } else {
2346                                None
2347                            },
2348                            newlines_in_values: maybe_csv.newlines_in_values(),
2349                        },
2350                    )),
2351                }));
2352            }
2353        }
2354
2355        if let Some(scan_conf) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2356            let source = scan_conf.file_source();
2357            if let Some(_json_source) = source.as_any().downcast_ref::<JsonSource>() {
2358                return Ok(Some(protobuf::PhysicalPlanNode {
2359                    physical_plan_type: Some(PhysicalPlanType::JsonScan(
2360                        protobuf::JsonScanExecNode {
2361                            base_conf: Some(serialize_file_scan_config(
2362                                scan_conf,
2363                                extension_codec,
2364                            )?),
2365                        },
2366                    )),
2367                }));
2368            }
2369        }
2370
2371        #[cfg(feature = "parquet")]
2372        if let Some((maybe_parquet, conf)) =
2373            data_source_exec.downcast_to_file_source::<ParquetSource>()
2374        {
2375            let predicate = conf
2376                .predicate()
2377                .map(|pred| serialize_physical_expr(pred, extension_codec))
2378                .transpose()?;
2379            return Ok(Some(protobuf::PhysicalPlanNode {
2380                physical_plan_type: Some(PhysicalPlanType::ParquetScan(
2381                    protobuf::ParquetScanExecNode {
2382                        base_conf: Some(serialize_file_scan_config(
2383                            maybe_parquet,
2384                            extension_codec,
2385                        )?),
2386                        predicate,
2387                        parquet_options: Some(conf.table_parquet_options().try_into()?),
2388                    },
2389                )),
2390            }));
2391        }
2392
2393        #[cfg(feature = "avro")]
2394        if let Some(maybe_avro) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2395            let source = maybe_avro.file_source();
2396            if source.as_any().downcast_ref::<AvroSource>().is_some() {
2397                return Ok(Some(protobuf::PhysicalPlanNode {
2398                    physical_plan_type: Some(PhysicalPlanType::AvroScan(
2399                        protobuf::AvroScanExecNode {
2400                            base_conf: Some(serialize_file_scan_config(
2401                                maybe_avro,
2402                                extension_codec,
2403                            )?),
2404                        },
2405                    )),
2406                }));
2407            }
2408        }
2409
2410        Ok(None)
2411    }
2412
2413    fn try_from_coalesce_partitions_exec(
2414        exec: &CoalescePartitionsExec,
2415        extension_codec: &dyn PhysicalExtensionCodec,
2416    ) -> Result<Self> {
2417        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2418            exec.input().to_owned(),
2419            extension_codec,
2420        )?;
2421        Ok(protobuf::PhysicalPlanNode {
2422            physical_plan_type: Some(PhysicalPlanType::Merge(Box::new(
2423                protobuf::CoalescePartitionsExecNode {
2424                    input: Some(Box::new(input)),
2425                    fetch: exec.fetch().map(|f| f as u32),
2426                },
2427            ))),
2428        })
2429    }
2430
2431    fn try_from_repartition_exec(
2432        exec: &RepartitionExec,
2433        extension_codec: &dyn PhysicalExtensionCodec,
2434    ) -> Result<Self> {
2435        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2436            exec.input().to_owned(),
2437            extension_codec,
2438        )?;
2439
2440        let pb_partitioning =
2441            serialize_partitioning(exec.partitioning(), extension_codec)?;
2442
2443        Ok(protobuf::PhysicalPlanNode {
2444            physical_plan_type: Some(PhysicalPlanType::Repartition(Box::new(
2445                protobuf::RepartitionExecNode {
2446                    input: Some(Box::new(input)),
2447                    partitioning: Some(pb_partitioning),
2448                },
2449            ))),
2450        })
2451    }
2452
2453    fn try_from_sort_exec(
2454        exec: &SortExec,
2455        extension_codec: &dyn PhysicalExtensionCodec,
2456    ) -> Result<Self> {
2457        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2458            exec.input().to_owned(),
2459            extension_codec,
2460        )?;
2461        let expr = exec
2462            .expr()
2463            .iter()
2464            .map(|expr| {
2465                let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
2466                    expr: Some(Box::new(serialize_physical_expr(
2467                        &expr.expr,
2468                        extension_codec,
2469                    )?)),
2470                    asc: !expr.options.descending,
2471                    nulls_first: expr.options.nulls_first,
2472                });
2473                Ok(protobuf::PhysicalExprNode {
2474                    expr_type: Some(ExprType::Sort(sort_expr)),
2475                })
2476            })
2477            .collect::<Result<Vec<_>>>()?;
2478        Ok(protobuf::PhysicalPlanNode {
2479            physical_plan_type: Some(PhysicalPlanType::Sort(Box::new(
2480                protobuf::SortExecNode {
2481                    input: Some(Box::new(input)),
2482                    expr,
2483                    fetch: match exec.fetch() {
2484                        Some(n) => n as i64,
2485                        _ => -1,
2486                    },
2487                    preserve_partitioning: exec.preserve_partitioning(),
2488                },
2489            ))),
2490        })
2491    }
2492
2493    fn try_from_union_exec(
2494        union: &UnionExec,
2495        extension_codec: &dyn PhysicalExtensionCodec,
2496    ) -> Result<Self> {
2497        let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
2498        for input in union.inputs() {
2499            inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
2500                input.to_owned(),
2501                extension_codec,
2502            )?);
2503        }
2504        Ok(protobuf::PhysicalPlanNode {
2505            physical_plan_type: Some(PhysicalPlanType::Union(protobuf::UnionExecNode {
2506                inputs,
2507            })),
2508        })
2509    }
2510
2511    fn try_from_interleave_exec(
2512        interleave: &InterleaveExec,
2513        extension_codec: &dyn PhysicalExtensionCodec,
2514    ) -> Result<Self> {
2515        let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
2516        for input in interleave.inputs() {
2517            inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
2518                input.to_owned(),
2519                extension_codec,
2520            )?);
2521        }
2522        Ok(protobuf::PhysicalPlanNode {
2523            physical_plan_type: Some(PhysicalPlanType::Interleave(
2524                protobuf::InterleaveExecNode { inputs },
2525            )),
2526        })
2527    }
2528
2529    fn try_from_sort_preserving_merge_exec(
2530        exec: &SortPreservingMergeExec,
2531        extension_codec: &dyn PhysicalExtensionCodec,
2532    ) -> Result<Self> {
2533        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2534            exec.input().to_owned(),
2535            extension_codec,
2536        )?;
2537        let expr = exec
2538            .expr()
2539            .iter()
2540            .map(|expr| {
2541                let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
2542                    expr: Some(Box::new(serialize_physical_expr(
2543                        &expr.expr,
2544                        extension_codec,
2545                    )?)),
2546                    asc: !expr.options.descending,
2547                    nulls_first: expr.options.nulls_first,
2548                });
2549                Ok(protobuf::PhysicalExprNode {
2550                    expr_type: Some(ExprType::Sort(sort_expr)),
2551                })
2552            })
2553            .collect::<Result<Vec<_>>>()?;
2554        Ok(protobuf::PhysicalPlanNode {
2555            physical_plan_type: Some(PhysicalPlanType::SortPreservingMerge(Box::new(
2556                protobuf::SortPreservingMergeExecNode {
2557                    input: Some(Box::new(input)),
2558                    expr,
2559                    fetch: exec.fetch().map(|f| f as i64).unwrap_or(-1),
2560                },
2561            ))),
2562        })
2563    }
2564
2565    fn try_from_nested_loop_join_exec(
2566        exec: &NestedLoopJoinExec,
2567        extension_codec: &dyn PhysicalExtensionCodec,
2568    ) -> Result<Self> {
2569        let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2570            exec.left().to_owned(),
2571            extension_codec,
2572        )?;
2573        let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2574            exec.right().to_owned(),
2575            extension_codec,
2576        )?;
2577
2578        let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2579        let filter = exec
2580            .filter()
2581            .as_ref()
2582            .map(|f| {
2583                let expression =
2584                    serialize_physical_expr(f.expression(), extension_codec)?;
2585                let column_indices = f
2586                    .column_indices()
2587                    .iter()
2588                    .map(|i| {
2589                        let side: protobuf::JoinSide = i.side.to_owned().into();
2590                        protobuf::ColumnIndex {
2591                            index: i.index as u32,
2592                            side: side.into(),
2593                        }
2594                    })
2595                    .collect();
2596                let schema = f.schema().as_ref().try_into()?;
2597                Ok(protobuf::JoinFilter {
2598                    expression: Some(expression),
2599                    column_indices,
2600                    schema: Some(schema),
2601                })
2602            })
2603            .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2604
2605        Ok(protobuf::PhysicalPlanNode {
2606            physical_plan_type: Some(PhysicalPlanType::NestedLoopJoin(Box::new(
2607                protobuf::NestedLoopJoinExecNode {
2608                    left: Some(Box::new(left)),
2609                    right: Some(Box::new(right)),
2610                    join_type: join_type.into(),
2611                    filter,
2612                    projection: exec.projection().map_or_else(Vec::new, |v| {
2613                        v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2614                    }),
2615                },
2616            ))),
2617        })
2618    }
2619
2620    fn try_from_window_agg_exec(
2621        exec: &WindowAggExec,
2622        extension_codec: &dyn PhysicalExtensionCodec,
2623    ) -> Result<Self> {
2624        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2625            exec.input().to_owned(),
2626            extension_codec,
2627        )?;
2628
2629        let window_expr = exec
2630            .window_expr()
2631            .iter()
2632            .map(|e| serialize_physical_window_expr(e, extension_codec))
2633            .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
2634
2635        let partition_keys = exec
2636            .partition_keys()
2637            .iter()
2638            .map(|e| serialize_physical_expr(e, extension_codec))
2639            .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
2640
2641        Ok(protobuf::PhysicalPlanNode {
2642            physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
2643                protobuf::WindowAggExecNode {
2644                    input: Some(Box::new(input)),
2645                    window_expr,
2646                    partition_keys,
2647                    input_order_mode: None,
2648                },
2649            ))),
2650        })
2651    }
2652
2653    fn try_from_bounded_window_agg_exec(
2654        exec: &BoundedWindowAggExec,
2655        extension_codec: &dyn PhysicalExtensionCodec,
2656    ) -> Result<Self> {
2657        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2658            exec.input().to_owned(),
2659            extension_codec,
2660        )?;
2661
2662        let window_expr = exec
2663            .window_expr()
2664            .iter()
2665            .map(|e| serialize_physical_window_expr(e, extension_codec))
2666            .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
2667
2668        let partition_keys = exec
2669            .partition_keys()
2670            .iter()
2671            .map(|e| serialize_physical_expr(e, extension_codec))
2672            .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
2673
2674        let input_order_mode = match &exec.input_order_mode {
2675            InputOrderMode::Linear => {
2676                window_agg_exec_node::InputOrderMode::Linear(protobuf::EmptyMessage {})
2677            }
2678            InputOrderMode::PartiallySorted(columns) => {
2679                window_agg_exec_node::InputOrderMode::PartiallySorted(
2680                    protobuf::PartiallySortedInputOrderMode {
2681                        columns: columns.iter().map(|c| *c as u64).collect(),
2682                    },
2683                )
2684            }
2685            InputOrderMode::Sorted => {
2686                window_agg_exec_node::InputOrderMode::Sorted(protobuf::EmptyMessage {})
2687            }
2688        };
2689
2690        Ok(protobuf::PhysicalPlanNode {
2691            physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
2692                protobuf::WindowAggExecNode {
2693                    input: Some(Box::new(input)),
2694                    window_expr,
2695                    partition_keys,
2696                    input_order_mode: Some(input_order_mode),
2697                },
2698            ))),
2699        })
2700    }
2701
2702    fn try_from_data_sink_exec(
2703        exec: &DataSinkExec,
2704        extension_codec: &dyn PhysicalExtensionCodec,
2705    ) -> Result<Option<Self>> {
2706        let input: protobuf::PhysicalPlanNode =
2707            protobuf::PhysicalPlanNode::try_from_physical_plan(
2708                exec.input().to_owned(),
2709                extension_codec,
2710            )?;
2711        let sort_order = match exec.sort_order() {
2712            Some(requirements) => {
2713                let expr = requirements
2714                    .iter()
2715                    .map(|requirement| {
2716                        let expr: PhysicalSortExpr = requirement.to_owned().into();
2717                        let sort_expr = protobuf::PhysicalSortExprNode {
2718                            expr: Some(Box::new(serialize_physical_expr(
2719                                &expr.expr,
2720                                extension_codec,
2721                            )?)),
2722                            asc: !expr.options.descending,
2723                            nulls_first: expr.options.nulls_first,
2724                        };
2725                        Ok(sort_expr)
2726                    })
2727                    .collect::<Result<Vec<_>>>()?;
2728                Some(protobuf::PhysicalSortExprNodeCollection {
2729                    physical_sort_expr_nodes: expr,
2730                })
2731            }
2732            None => None,
2733        };
2734
2735        if let Some(sink) = exec.sink().as_any().downcast_ref::<JsonSink>() {
2736            return Ok(Some(protobuf::PhysicalPlanNode {
2737                physical_plan_type: Some(PhysicalPlanType::JsonSink(Box::new(
2738                    protobuf::JsonSinkExecNode {
2739                        input: Some(Box::new(input)),
2740                        sink: Some(sink.try_into()?),
2741                        sink_schema: Some(exec.schema().as_ref().try_into()?),
2742                        sort_order,
2743                    },
2744                ))),
2745            }));
2746        }
2747
2748        if let Some(sink) = exec.sink().as_any().downcast_ref::<CsvSink>() {
2749            return Ok(Some(protobuf::PhysicalPlanNode {
2750                physical_plan_type: Some(PhysicalPlanType::CsvSink(Box::new(
2751                    protobuf::CsvSinkExecNode {
2752                        input: Some(Box::new(input)),
2753                        sink: Some(sink.try_into()?),
2754                        sink_schema: Some(exec.schema().as_ref().try_into()?),
2755                        sort_order,
2756                    },
2757                ))),
2758            }));
2759        }
2760
2761        #[cfg(feature = "parquet")]
2762        if let Some(sink) = exec.sink().as_any().downcast_ref::<ParquetSink>() {
2763            return Ok(Some(protobuf::PhysicalPlanNode {
2764                physical_plan_type: Some(PhysicalPlanType::ParquetSink(Box::new(
2765                    protobuf::ParquetSinkExecNode {
2766                        input: Some(Box::new(input)),
2767                        sink: Some(sink.try_into()?),
2768                        sink_schema: Some(exec.schema().as_ref().try_into()?),
2769                        sort_order,
2770                    },
2771                ))),
2772            }));
2773        }
2774
2775        // If unknown DataSink then let extension handle it
2776        Ok(None)
2777    }
2778
2779    fn try_from_unnest_exec(
2780        exec: &UnnestExec,
2781        extension_codec: &dyn PhysicalExtensionCodec,
2782    ) -> Result<Self> {
2783        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2784            exec.input().to_owned(),
2785            extension_codec,
2786        )?;
2787
2788        Ok(protobuf::PhysicalPlanNode {
2789            physical_plan_type: Some(PhysicalPlanType::Unnest(Box::new(
2790                protobuf::UnnestExecNode {
2791                    input: Some(Box::new(input)),
2792                    schema: Some(exec.schema().try_into()?),
2793                    list_type_columns: exec
2794                        .list_column_indices()
2795                        .iter()
2796                        .map(|c| ProtoListUnnest {
2797                            index_in_input_schema: c.index_in_input_schema as _,
2798                            depth: c.depth as _,
2799                        })
2800                        .collect(),
2801                    struct_type_columns: exec
2802                        .struct_column_indices()
2803                        .iter()
2804                        .map(|c| *c as _)
2805                        .collect(),
2806                    options: Some(exec.options().into()),
2807                },
2808            ))),
2809        })
2810    }
2811
2812    fn try_from_cooperative_exec(
2813        exec: &CooperativeExec,
2814        extension_codec: &dyn PhysicalExtensionCodec,
2815    ) -> Result<Self> {
2816        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2817            exec.input().to_owned(),
2818            extension_codec,
2819        )?;
2820
2821        Ok(protobuf::PhysicalPlanNode {
2822            physical_plan_type: Some(PhysicalPlanType::Cooperative(Box::new(
2823                protobuf::CooperativeExecNode {
2824                    input: Some(Box::new(input)),
2825                },
2826            ))),
2827        })
2828    }
2829}
2830
2831pub trait AsExecutionPlan: Debug + Send + Sync + Clone {
2832    fn try_decode(buf: &[u8]) -> Result<Self>
2833    where
2834        Self: Sized;
2835
2836    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
2837    where
2838        B: BufMut,
2839        Self: Sized;
2840
2841    fn try_into_physical_plan(
2842        &self,
2843        registry: &dyn FunctionRegistry,
2844        runtime: &RuntimeEnv,
2845        extension_codec: &dyn PhysicalExtensionCodec,
2846    ) -> Result<Arc<dyn ExecutionPlan>>;
2847
2848    fn try_from_physical_plan(
2849        plan: Arc<dyn ExecutionPlan>,
2850        extension_codec: &dyn PhysicalExtensionCodec,
2851    ) -> Result<Self>
2852    where
2853        Self: Sized;
2854}
2855
2856pub trait PhysicalExtensionCodec: Debug + Send + Sync {
2857    fn try_decode(
2858        &self,
2859        buf: &[u8],
2860        inputs: &[Arc<dyn ExecutionPlan>],
2861        registry: &dyn FunctionRegistry,
2862    ) -> Result<Arc<dyn ExecutionPlan>>;
2863
2864    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()>;
2865
2866    fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
2867        not_impl_err!("PhysicalExtensionCodec is not provided for scalar function {name}")
2868    }
2869
2870    fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
2871        Ok(())
2872    }
2873
2874    fn try_decode_expr(
2875        &self,
2876        _buf: &[u8],
2877        _inputs: &[Arc<dyn PhysicalExpr>],
2878    ) -> Result<Arc<dyn PhysicalExpr>> {
2879        not_impl_err!("PhysicalExtensionCodec is not provided")
2880    }
2881
2882    fn try_encode_expr(
2883        &self,
2884        _node: &Arc<dyn PhysicalExpr>,
2885        _buf: &mut Vec<u8>,
2886    ) -> Result<()> {
2887        not_impl_err!("PhysicalExtensionCodec is not provided")
2888    }
2889
2890    fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
2891        not_impl_err!(
2892            "PhysicalExtensionCodec is not provided for aggregate function {name}"
2893        )
2894    }
2895
2896    fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
2897        Ok(())
2898    }
2899
2900    fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
2901        not_impl_err!("PhysicalExtensionCodec is not provided for window function {name}")
2902    }
2903
2904    fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
2905        Ok(())
2906    }
2907}
2908
2909#[derive(Debug)]
2910pub struct DefaultPhysicalExtensionCodec {}
2911
2912impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec {
2913    fn try_decode(
2914        &self,
2915        _buf: &[u8],
2916        _inputs: &[Arc<dyn ExecutionPlan>],
2917        _registry: &dyn FunctionRegistry,
2918    ) -> Result<Arc<dyn ExecutionPlan>> {
2919        not_impl_err!("PhysicalExtensionCodec is not provided")
2920    }
2921
2922    fn try_encode(
2923        &self,
2924        _node: Arc<dyn ExecutionPlan>,
2925        _buf: &mut Vec<u8>,
2926    ) -> Result<()> {
2927        not_impl_err!("PhysicalExtensionCodec is not provided")
2928    }
2929}
2930
2931fn into_physical_plan(
2932    node: &Option<Box<protobuf::PhysicalPlanNode>>,
2933    registry: &dyn FunctionRegistry,
2934    runtime: &RuntimeEnv,
2935    extension_codec: &dyn PhysicalExtensionCodec,
2936) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
2937    if let Some(field) = node {
2938        field.try_into_physical_plan(registry, runtime, extension_codec)
2939    } else {
2940        Err(proto_error("Missing required field in protobuf"))
2941    }
2942}