datafusion_proto/physical_plan/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use self::from_proto::parse_protobuf_partitioning;
22use self::to_proto::{serialize_partitioning, serialize_physical_expr};
23use crate::common::{byte_to_string, str_to_byte};
24use crate::physical_plan::from_proto::{
25    parse_physical_expr, parse_physical_sort_expr, parse_physical_sort_exprs,
26    parse_physical_window_expr, parse_protobuf_file_scan_config,
27    parse_protobuf_file_scan_schema,
28};
29use crate::physical_plan::to_proto::{
30    serialize_file_scan_config, serialize_maybe_filter, serialize_physical_aggr_expr,
31    serialize_physical_window_expr,
32};
33use crate::protobuf::physical_aggregate_expr_node::AggregateFunction;
34use crate::protobuf::physical_expr_node::ExprType;
35use crate::protobuf::physical_plan_node::PhysicalPlanType;
36use crate::protobuf::{
37    self, proto_error, window_agg_exec_node, ListUnnest as ProtoListUnnest,
38};
39use crate::{convert_required, into_required};
40
41use datafusion::arrow::compute::SortOptions;
42use datafusion::arrow::datatypes::SchemaRef;
43use datafusion::datasource::file_format::csv::CsvSink;
44use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
45use datafusion::datasource::file_format::json::JsonSink;
46#[cfg(feature = "parquet")]
47use datafusion::datasource::file_format::parquet::ParquetSink;
48#[cfg(feature = "avro")]
49use datafusion::datasource::physical_plan::AvroSource;
50#[cfg(feature = "parquet")]
51use datafusion::datasource::physical_plan::ParquetSource;
52use datafusion::datasource::physical_plan::{
53    CsvSource, FileScanConfig, FileScanConfigBuilder, JsonSource,
54};
55use datafusion::datasource::sink::DataSinkExec;
56use datafusion::datasource::source::DataSourceExec;
57use datafusion::execution::runtime_env::RuntimeEnv;
58use datafusion::execution::FunctionRegistry;
59use datafusion::physical_expr::aggregate::AggregateExprBuilder;
60use datafusion::physical_expr::aggregate::AggregateFunctionExpr;
61use datafusion::physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef};
62use datafusion::physical_plan::aggregates::AggregateMode;
63use datafusion::physical_plan::aggregates::{AggregateExec, PhysicalGroupBy};
64use datafusion::physical_plan::analyze::AnalyzeExec;
65use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
66use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
67use datafusion::physical_plan::empty::EmptyExec;
68use datafusion::physical_plan::explain::ExplainExec;
69use datafusion::physical_plan::expressions::PhysicalSortExpr;
70use datafusion::physical_plan::filter::FilterExec;
71use datafusion::physical_plan::joins::utils::{ColumnIndex, JoinFilter};
72use datafusion::physical_plan::joins::{
73    CrossJoinExec, NestedLoopJoinExec, StreamJoinPartitionMode, SymmetricHashJoinExec,
74};
75use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
76use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
77use datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
78use datafusion::physical_plan::projection::ProjectionExec;
79use datafusion::physical_plan::repartition::RepartitionExec;
80use datafusion::physical_plan::sorts::sort::SortExec;
81use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
82use datafusion::physical_plan::union::{InterleaveExec, UnionExec};
83use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec};
84use datafusion::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
85use datafusion::physical_plan::{
86    ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr,
87};
88use datafusion_common::config::TableParquetOptions;
89use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
90use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
91
92use prost::bytes::BufMut;
93use prost::Message;
94
95pub mod from_proto;
96pub mod to_proto;
97
98impl AsExecutionPlan for protobuf::PhysicalPlanNode {
99    fn try_decode(buf: &[u8]) -> Result<Self>
100    where
101        Self: Sized,
102    {
103        protobuf::PhysicalPlanNode::decode(buf).map_err(|e| {
104            DataFusionError::Internal(format!("failed to decode physical plan: {e:?}"))
105        })
106    }
107
108    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
109    where
110        B: BufMut,
111        Self: Sized,
112    {
113        self.encode(buf).map_err(|e| {
114            DataFusionError::Internal(format!("failed to encode physical plan: {e:?}"))
115        })
116    }
117
118    fn try_into_physical_plan(
119        &self,
120        registry: &dyn FunctionRegistry,
121        runtime: &RuntimeEnv,
122        extension_codec: &dyn PhysicalExtensionCodec,
123    ) -> Result<Arc<dyn ExecutionPlan>> {
124        let plan = self.physical_plan_type.as_ref().ok_or_else(|| {
125            proto_error(format!(
126                "physical_plan::from_proto() Unsupported physical plan '{self:?}'"
127            ))
128        })?;
129        match plan {
130            PhysicalPlanType::Explain(explain) => self.try_into_explain_physical_plan(
131                explain,
132                registry,
133                runtime,
134                extension_codec,
135            ),
136            PhysicalPlanType::Projection(projection) => self
137                .try_into_projection_physical_plan(
138                    projection,
139                    registry,
140                    runtime,
141                    extension_codec,
142                ),
143            PhysicalPlanType::Filter(filter) => self.try_into_filter_physical_plan(
144                filter,
145                registry,
146                runtime,
147                extension_codec,
148            ),
149            PhysicalPlanType::CsvScan(scan) => self.try_into_csv_scan_physical_plan(
150                scan,
151                registry,
152                runtime,
153                extension_codec,
154            ),
155            PhysicalPlanType::JsonScan(scan) => self.try_into_json_scan_physical_plan(
156                scan,
157                registry,
158                runtime,
159                extension_codec,
160            ),
161            #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
162            PhysicalPlanType::ParquetScan(scan) => self
163                .try_into_parquet_scan_physical_plan(
164                    scan,
165                    registry,
166                    runtime,
167                    extension_codec,
168                ),
169            #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
170            PhysicalPlanType::AvroScan(scan) => self.try_into_avro_scan_physical_plan(
171                scan,
172                registry,
173                runtime,
174                extension_codec,
175            ),
176            PhysicalPlanType::CoalesceBatches(coalesce_batches) => self
177                .try_into_coalesce_batches_physical_plan(
178                    coalesce_batches,
179                    registry,
180                    runtime,
181                    extension_codec,
182                ),
183            PhysicalPlanType::Merge(merge) => self.try_into_merge_physical_plan(
184                merge,
185                registry,
186                runtime,
187                extension_codec,
188            ),
189            PhysicalPlanType::Repartition(repart) => self
190                .try_into_repartition_physical_plan(
191                    repart,
192                    registry,
193                    runtime,
194                    extension_codec,
195                ),
196            PhysicalPlanType::GlobalLimit(limit) => self
197                .try_into_global_limit_physical_plan(
198                    limit,
199                    registry,
200                    runtime,
201                    extension_codec,
202                ),
203            PhysicalPlanType::LocalLimit(limit) => self
204                .try_into_local_limit_physical_plan(
205                    limit,
206                    registry,
207                    runtime,
208                    extension_codec,
209                ),
210            PhysicalPlanType::Window(window_agg) => self.try_into_window_physical_plan(
211                window_agg,
212                registry,
213                runtime,
214                extension_codec,
215            ),
216            PhysicalPlanType::Aggregate(hash_agg) => self
217                .try_into_aggregate_physical_plan(
218                    hash_agg,
219                    registry,
220                    runtime,
221                    extension_codec,
222                ),
223            PhysicalPlanType::HashJoin(hashjoin) => self
224                .try_into_hash_join_physical_plan(
225                    hashjoin,
226                    registry,
227                    runtime,
228                    extension_codec,
229                ),
230            PhysicalPlanType::SymmetricHashJoin(sym_join) => self
231                .try_into_symmetric_hash_join_physical_plan(
232                    sym_join,
233                    registry,
234                    runtime,
235                    extension_codec,
236                ),
237            PhysicalPlanType::Union(union) => self.try_into_union_physical_plan(
238                union,
239                registry,
240                runtime,
241                extension_codec,
242            ),
243            PhysicalPlanType::Interleave(interleave) => self
244                .try_into_interleave_physical_plan(
245                    interleave,
246                    registry,
247                    runtime,
248                    extension_codec,
249                ),
250            PhysicalPlanType::CrossJoin(crossjoin) => self
251                .try_into_cross_join_physical_plan(
252                    crossjoin,
253                    registry,
254                    runtime,
255                    extension_codec,
256                ),
257            PhysicalPlanType::Empty(empty) => self.try_into_empty_physical_plan(
258                empty,
259                registry,
260                runtime,
261                extension_codec,
262            ),
263            PhysicalPlanType::PlaceholderRow(placeholder) => self
264                .try_into_placeholder_row_physical_plan(
265                    placeholder,
266                    registry,
267                    runtime,
268                    extension_codec,
269                ),
270            PhysicalPlanType::Sort(sort) => {
271                self.try_into_sort_physical_plan(sort, registry, runtime, extension_codec)
272            }
273            PhysicalPlanType::SortPreservingMerge(sort) => self
274                .try_into_sort_preserving_merge_physical_plan(
275                    sort,
276                    registry,
277                    runtime,
278                    extension_codec,
279                ),
280            PhysicalPlanType::Extension(extension) => self
281                .try_into_extension_physical_plan(
282                    extension,
283                    registry,
284                    runtime,
285                    extension_codec,
286                ),
287            PhysicalPlanType::NestedLoopJoin(join) => self
288                .try_into_nested_loop_join_physical_plan(
289                    join,
290                    registry,
291                    runtime,
292                    extension_codec,
293                ),
294            PhysicalPlanType::Analyze(analyze) => self.try_into_analyze_physical_plan(
295                analyze,
296                registry,
297                runtime,
298                extension_codec,
299            ),
300            PhysicalPlanType::JsonSink(sink) => self.try_into_json_sink_physical_plan(
301                sink,
302                registry,
303                runtime,
304                extension_codec,
305            ),
306            PhysicalPlanType::CsvSink(sink) => self.try_into_csv_sink_physical_plan(
307                sink,
308                registry,
309                runtime,
310                extension_codec,
311            ),
312
313            #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
314            PhysicalPlanType::ParquetSink(sink) => self
315                .try_into_parquet_sink_physical_plan(
316                    sink,
317                    registry,
318                    runtime,
319                    extension_codec,
320                ),
321            PhysicalPlanType::Unnest(unnest) => self.try_into_unnest_physical_plan(
322                unnest,
323                registry,
324                runtime,
325                extension_codec,
326            ),
327        }
328    }
329
330    fn try_from_physical_plan(
331        plan: Arc<dyn ExecutionPlan>,
332        extension_codec: &dyn PhysicalExtensionCodec,
333    ) -> Result<Self>
334    where
335        Self: Sized,
336    {
337        let plan_clone = Arc::clone(&plan);
338        let plan = plan.as_any();
339
340        if let Some(exec) = plan.downcast_ref::<ExplainExec>() {
341            return protobuf::PhysicalPlanNode::try_from_explain_exec(
342                exec,
343                extension_codec,
344            );
345        }
346
347        if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
348            return protobuf::PhysicalPlanNode::try_from_projection_exec(
349                exec,
350                extension_codec,
351            );
352        }
353
354        if let Some(exec) = plan.downcast_ref::<AnalyzeExec>() {
355            return protobuf::PhysicalPlanNode::try_from_analyze_exec(
356                exec,
357                extension_codec,
358            );
359        }
360
361        if let Some(exec) = plan.downcast_ref::<FilterExec>() {
362            return protobuf::PhysicalPlanNode::try_from_filter_exec(
363                exec,
364                extension_codec,
365            );
366        }
367
368        if let Some(limit) = plan.downcast_ref::<GlobalLimitExec>() {
369            return protobuf::PhysicalPlanNode::try_from_global_limit_exec(
370                limit,
371                extension_codec,
372            );
373        }
374
375        if let Some(limit) = plan.downcast_ref::<LocalLimitExec>() {
376            return protobuf::PhysicalPlanNode::try_from_local_limit_exec(
377                limit,
378                extension_codec,
379            );
380        }
381
382        if let Some(exec) = plan.downcast_ref::<HashJoinExec>() {
383            return protobuf::PhysicalPlanNode::try_from_hash_join_exec(
384                exec,
385                extension_codec,
386            );
387        }
388
389        if let Some(exec) = plan.downcast_ref::<SymmetricHashJoinExec>() {
390            return protobuf::PhysicalPlanNode::try_from_symmetric_hash_join_exec(
391                exec,
392                extension_codec,
393            );
394        }
395
396        if let Some(exec) = plan.downcast_ref::<CrossJoinExec>() {
397            return protobuf::PhysicalPlanNode::try_from_cross_join_exec(
398                exec,
399                extension_codec,
400            );
401        }
402
403        if let Some(exec) = plan.downcast_ref::<AggregateExec>() {
404            return protobuf::PhysicalPlanNode::try_from_aggregate_exec(
405                exec,
406                extension_codec,
407            );
408        }
409
410        if let Some(empty) = plan.downcast_ref::<EmptyExec>() {
411            return protobuf::PhysicalPlanNode::try_from_empty_exec(
412                empty,
413                extension_codec,
414            );
415        }
416
417        if let Some(empty) = plan.downcast_ref::<PlaceholderRowExec>() {
418            return protobuf::PhysicalPlanNode::try_from_placeholder_row_exec(
419                empty,
420                extension_codec,
421            );
422        }
423
424        if let Some(coalesce_batches) = plan.downcast_ref::<CoalesceBatchesExec>() {
425            return protobuf::PhysicalPlanNode::try_from_coalesce_batches_exec(
426                coalesce_batches,
427                extension_codec,
428            );
429        }
430
431        if let Some(data_source_exec) = plan.downcast_ref::<DataSourceExec>() {
432            if let Some(node) = protobuf::PhysicalPlanNode::try_from_data_source_exec(
433                data_source_exec,
434                extension_codec,
435            )? {
436                return Ok(node);
437            }
438        }
439
440        if let Some(exec) = plan.downcast_ref::<CoalescePartitionsExec>() {
441            return protobuf::PhysicalPlanNode::try_from_coalesce_partitions_exec(
442                exec,
443                extension_codec,
444            );
445        }
446
447        if let Some(exec) = plan.downcast_ref::<RepartitionExec>() {
448            return protobuf::PhysicalPlanNode::try_from_repartition_exec(
449                exec,
450                extension_codec,
451            );
452        }
453
454        if let Some(exec) = plan.downcast_ref::<SortExec>() {
455            return protobuf::PhysicalPlanNode::try_from_sort_exec(exec, extension_codec);
456        }
457
458        if let Some(union) = plan.downcast_ref::<UnionExec>() {
459            return protobuf::PhysicalPlanNode::try_from_union_exec(
460                union,
461                extension_codec,
462            );
463        }
464
465        if let Some(interleave) = plan.downcast_ref::<InterleaveExec>() {
466            return protobuf::PhysicalPlanNode::try_from_interleave_exec(
467                interleave,
468                extension_codec,
469            );
470        }
471
472        if let Some(exec) = plan.downcast_ref::<SortPreservingMergeExec>() {
473            return protobuf::PhysicalPlanNode::try_from_sort_preserving_merge_exec(
474                exec,
475                extension_codec,
476            );
477        }
478
479        if let Some(exec) = plan.downcast_ref::<NestedLoopJoinExec>() {
480            return protobuf::PhysicalPlanNode::try_from_nested_loop_join_exec(
481                exec,
482                extension_codec,
483            );
484        }
485
486        if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
487            return protobuf::PhysicalPlanNode::try_from_window_agg_exec(
488                exec,
489                extension_codec,
490            );
491        }
492
493        if let Some(exec) = plan.downcast_ref::<BoundedWindowAggExec>() {
494            return protobuf::PhysicalPlanNode::try_from_bounded_window_agg_exec(
495                exec,
496                extension_codec,
497            );
498        }
499
500        if let Some(exec) = plan.downcast_ref::<DataSinkExec>() {
501            if let Some(node) = protobuf::PhysicalPlanNode::try_from_data_sink_exec(
502                exec,
503                extension_codec,
504            )? {
505                return Ok(node);
506            }
507        }
508
509        if let Some(exec) = plan.downcast_ref::<UnnestExec>() {
510            return protobuf::PhysicalPlanNode::try_from_unnest_exec(
511                exec,
512                extension_codec,
513            );
514        }
515
516        let mut buf: Vec<u8> = vec![];
517        match extension_codec.try_encode(Arc::clone(&plan_clone), &mut buf) {
518            Ok(_) => {
519                let inputs: Vec<protobuf::PhysicalPlanNode> = plan_clone
520                    .children()
521                    .into_iter()
522                    .cloned()
523                    .map(|i| {
524                        protobuf::PhysicalPlanNode::try_from_physical_plan(
525                            i,
526                            extension_codec,
527                        )
528                    })
529                    .collect::<Result<_>>()?;
530
531                Ok(protobuf::PhysicalPlanNode {
532                    physical_plan_type: Some(PhysicalPlanType::Extension(
533                        protobuf::PhysicalExtensionNode { node: buf, inputs },
534                    )),
535                })
536            }
537            Err(e) => internal_err!(
538                "Unsupported plan and extension codec failed with [{e}]. Plan: {plan_clone:?}"
539            ),
540        }
541    }
542}
543
544impl protobuf::PhysicalPlanNode {
545    fn try_into_explain_physical_plan(
546        &self,
547        explain: &protobuf::ExplainExecNode,
548        _registry: &dyn FunctionRegistry,
549        _runtime: &RuntimeEnv,
550        _extension_codec: &dyn PhysicalExtensionCodec,
551    ) -> Result<Arc<dyn ExecutionPlan>> {
552        Ok(Arc::new(ExplainExec::new(
553            Arc::new(explain.schema.as_ref().unwrap().try_into()?),
554            explain
555                .stringified_plans
556                .iter()
557                .map(|plan| plan.into())
558                .collect(),
559            explain.verbose,
560        )))
561    }
562
563    fn try_into_projection_physical_plan(
564        &self,
565        projection: &protobuf::ProjectionExecNode,
566        registry: &dyn FunctionRegistry,
567        runtime: &RuntimeEnv,
568        extension_codec: &dyn PhysicalExtensionCodec,
569    ) -> Result<Arc<dyn ExecutionPlan>> {
570        let input: Arc<dyn ExecutionPlan> =
571            into_physical_plan(&projection.input, registry, runtime, extension_codec)?;
572        let exprs = projection
573            .expr
574            .iter()
575            .zip(projection.expr_name.iter())
576            .map(|(expr, name)| {
577                Ok((
578                    parse_physical_expr(
579                        expr,
580                        registry,
581                        input.schema().as_ref(),
582                        extension_codec,
583                    )?,
584                    name.to_string(),
585                ))
586            })
587            .collect::<Result<Vec<(Arc<dyn PhysicalExpr>, String)>>>()?;
588        Ok(Arc::new(ProjectionExec::try_new(exprs, input)?))
589    }
590
591    fn try_into_filter_physical_plan(
592        &self,
593        filter: &protobuf::FilterExecNode,
594        registry: &dyn FunctionRegistry,
595        runtime: &RuntimeEnv,
596        extension_codec: &dyn PhysicalExtensionCodec,
597    ) -> Result<Arc<dyn ExecutionPlan>> {
598        let input: Arc<dyn ExecutionPlan> =
599            into_physical_plan(&filter.input, registry, runtime, extension_codec)?;
600        let predicate = filter
601            .expr
602            .as_ref()
603            .map(|expr| {
604                parse_physical_expr(
605                    expr,
606                    registry,
607                    input.schema().as_ref(),
608                    extension_codec,
609                )
610            })
611            .transpose()?
612            .ok_or_else(|| {
613                DataFusionError::Internal(
614                    "filter (FilterExecNode) in PhysicalPlanNode is missing.".to_owned(),
615                )
616            })?;
617        let filter_selectivity = filter.default_filter_selectivity.try_into();
618        let projection = if !filter.projection.is_empty() {
619            Some(
620                filter
621                    .projection
622                    .iter()
623                    .map(|i| *i as usize)
624                    .collect::<Vec<_>>(),
625            )
626        } else {
627            None
628        };
629        let filter =
630            FilterExec::try_new(predicate, input)?.with_projection(projection)?;
631        match filter_selectivity {
632            Ok(filter_selectivity) => Ok(Arc::new(
633                filter.with_default_selectivity(filter_selectivity)?,
634            )),
635            Err(_) => Err(DataFusionError::Internal(
636                "filter_selectivity in PhysicalPlanNode is invalid ".to_owned(),
637            )),
638        }
639    }
640
641    fn try_into_csv_scan_physical_plan(
642        &self,
643        scan: &protobuf::CsvScanExecNode,
644        registry: &dyn FunctionRegistry,
645        _runtime: &RuntimeEnv,
646        extension_codec: &dyn PhysicalExtensionCodec,
647    ) -> Result<Arc<dyn ExecutionPlan>> {
648        let escape =
649            if let Some(protobuf::csv_scan_exec_node::OptionalEscape::Escape(escape)) =
650                &scan.optional_escape
651            {
652                Some(str_to_byte(escape, "escape")?)
653            } else {
654                None
655            };
656
657        let comment = if let Some(
658            protobuf::csv_scan_exec_node::OptionalComment::Comment(comment),
659        ) = &scan.optional_comment
660        {
661            Some(str_to_byte(comment, "comment")?)
662        } else {
663            None
664        };
665
666        let source = Arc::new(
667            CsvSource::new(
668                scan.has_header,
669                str_to_byte(&scan.delimiter, "delimiter")?,
670                0,
671            )
672            .with_escape(escape)
673            .with_comment(comment),
674        );
675
676        let conf = FileScanConfigBuilder::from(parse_protobuf_file_scan_config(
677            scan.base_conf.as_ref().unwrap(),
678            registry,
679            extension_codec,
680            source,
681        )?)
682        .with_newlines_in_values(scan.newlines_in_values)
683        .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
684        .build();
685        Ok(DataSourceExec::from_data_source(conf))
686    }
687
688    fn try_into_json_scan_physical_plan(
689        &self,
690        scan: &protobuf::JsonScanExecNode,
691        registry: &dyn FunctionRegistry,
692        _runtime: &RuntimeEnv,
693        extension_codec: &dyn PhysicalExtensionCodec,
694    ) -> Result<Arc<dyn ExecutionPlan>> {
695        let scan_conf = parse_protobuf_file_scan_config(
696            scan.base_conf.as_ref().unwrap(),
697            registry,
698            extension_codec,
699            Arc::new(JsonSource::new()),
700        )?;
701        Ok(DataSourceExec::from_data_source(scan_conf))
702    }
703
704    #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
705    fn try_into_parquet_scan_physical_plan(
706        &self,
707        scan: &protobuf::ParquetScanExecNode,
708        registry: &dyn FunctionRegistry,
709        _runtime: &RuntimeEnv,
710        extension_codec: &dyn PhysicalExtensionCodec,
711    ) -> Result<Arc<dyn ExecutionPlan>> {
712        #[cfg(feature = "parquet")]
713        {
714            let schema =
715                parse_protobuf_file_scan_schema(scan.base_conf.as_ref().unwrap())?;
716            let predicate = scan
717                .predicate
718                .as_ref()
719                .map(|expr| {
720                    parse_physical_expr(expr, registry, schema.as_ref(), extension_codec)
721                })
722                .transpose()?;
723            let mut options = TableParquetOptions::default();
724
725            if let Some(table_options) = scan.parquet_options.as_ref() {
726                options = table_options.try_into()?;
727            }
728            let mut source = ParquetSource::new(options);
729
730            if let Some(predicate) = predicate {
731                source = source.with_predicate(Arc::clone(&schema), predicate);
732            }
733            let base_config = parse_protobuf_file_scan_config(
734                scan.base_conf.as_ref().unwrap(),
735                registry,
736                extension_codec,
737                Arc::new(source),
738            )?;
739            Ok(DataSourceExec::from_data_source(base_config))
740        }
741        #[cfg(not(feature = "parquet"))]
742        panic!("Unable to process a Parquet PhysicalPlan when `parquet` feature is not enabled")
743    }
744
745    #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
746    fn try_into_avro_scan_physical_plan(
747        &self,
748        scan: &protobuf::AvroScanExecNode,
749        registry: &dyn FunctionRegistry,
750        _runtime: &RuntimeEnv,
751        extension_codec: &dyn PhysicalExtensionCodec,
752    ) -> Result<Arc<dyn ExecutionPlan>> {
753        #[cfg(feature = "avro")]
754        {
755            let conf = parse_protobuf_file_scan_config(
756                scan.base_conf.as_ref().unwrap(),
757                registry,
758                extension_codec,
759                Arc::new(AvroSource::new()),
760            )?;
761            Ok(DataSourceExec::from_data_source(conf))
762        }
763        #[cfg(not(feature = "avro"))]
764        panic!("Unable to process a Avro PhysicalPlan when `avro` feature is not enabled")
765    }
766
767    fn try_into_coalesce_batches_physical_plan(
768        &self,
769        coalesce_batches: &protobuf::CoalesceBatchesExecNode,
770        registry: &dyn FunctionRegistry,
771        runtime: &RuntimeEnv,
772        extension_codec: &dyn PhysicalExtensionCodec,
773    ) -> Result<Arc<dyn ExecutionPlan>> {
774        let input: Arc<dyn ExecutionPlan> = into_physical_plan(
775            &coalesce_batches.input,
776            registry,
777            runtime,
778            extension_codec,
779        )?;
780        Ok(Arc::new(
781            CoalesceBatchesExec::new(input, coalesce_batches.target_batch_size as usize)
782                .with_fetch(coalesce_batches.fetch.map(|f| f as usize)),
783        ))
784    }
785
786    fn try_into_merge_physical_plan(
787        &self,
788        merge: &protobuf::CoalescePartitionsExecNode,
789        registry: &dyn FunctionRegistry,
790        runtime: &RuntimeEnv,
791        extension_codec: &dyn PhysicalExtensionCodec,
792    ) -> Result<Arc<dyn ExecutionPlan>> {
793        let input: Arc<dyn ExecutionPlan> =
794            into_physical_plan(&merge.input, registry, runtime, extension_codec)?;
795        Ok(Arc::new(CoalescePartitionsExec::new(input)))
796    }
797
798    fn try_into_repartition_physical_plan(
799        &self,
800        repart: &protobuf::RepartitionExecNode,
801        registry: &dyn FunctionRegistry,
802        runtime: &RuntimeEnv,
803        extension_codec: &dyn PhysicalExtensionCodec,
804    ) -> Result<Arc<dyn ExecutionPlan>> {
805        let input: Arc<dyn ExecutionPlan> =
806            into_physical_plan(&repart.input, registry, runtime, extension_codec)?;
807        let partitioning = parse_protobuf_partitioning(
808            repart.partitioning.as_ref(),
809            registry,
810            input.schema().as_ref(),
811            extension_codec,
812        )?;
813        Ok(Arc::new(RepartitionExec::try_new(
814            input,
815            partitioning.unwrap(),
816        )?))
817    }
818
819    fn try_into_global_limit_physical_plan(
820        &self,
821        limit: &protobuf::GlobalLimitExecNode,
822        registry: &dyn FunctionRegistry,
823        runtime: &RuntimeEnv,
824        extension_codec: &dyn PhysicalExtensionCodec,
825    ) -> Result<Arc<dyn ExecutionPlan>> {
826        let input: Arc<dyn ExecutionPlan> =
827            into_physical_plan(&limit.input, registry, runtime, extension_codec)?;
828        let fetch = if limit.fetch >= 0 {
829            Some(limit.fetch as usize)
830        } else {
831            None
832        };
833        Ok(Arc::new(GlobalLimitExec::new(
834            input,
835            limit.skip as usize,
836            fetch,
837        )))
838    }
839
840    fn try_into_local_limit_physical_plan(
841        &self,
842        limit: &protobuf::LocalLimitExecNode,
843        registry: &dyn FunctionRegistry,
844        runtime: &RuntimeEnv,
845        extension_codec: &dyn PhysicalExtensionCodec,
846    ) -> Result<Arc<dyn ExecutionPlan>> {
847        let input: Arc<dyn ExecutionPlan> =
848            into_physical_plan(&limit.input, registry, runtime, extension_codec)?;
849        Ok(Arc::new(LocalLimitExec::new(input, limit.fetch as usize)))
850    }
851
852    fn try_into_window_physical_plan(
853        &self,
854        window_agg: &protobuf::WindowAggExecNode,
855        registry: &dyn FunctionRegistry,
856        runtime: &RuntimeEnv,
857        extension_codec: &dyn PhysicalExtensionCodec,
858    ) -> Result<Arc<dyn ExecutionPlan>> {
859        let input: Arc<dyn ExecutionPlan> =
860            into_physical_plan(&window_agg.input, registry, runtime, extension_codec)?;
861        let input_schema = input.schema();
862
863        let physical_window_expr: Vec<Arc<dyn WindowExpr>> = window_agg
864            .window_expr
865            .iter()
866            .map(|window_expr| {
867                parse_physical_window_expr(
868                    window_expr,
869                    registry,
870                    input_schema.as_ref(),
871                    extension_codec,
872                )
873            })
874            .collect::<Result<Vec<_>, _>>()?;
875
876        let partition_keys = window_agg
877            .partition_keys
878            .iter()
879            .map(|expr| {
880                parse_physical_expr(
881                    expr,
882                    registry,
883                    input.schema().as_ref(),
884                    extension_codec,
885                )
886            })
887            .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
888
889        if let Some(input_order_mode) = window_agg.input_order_mode.as_ref() {
890            let input_order_mode = match input_order_mode {
891                window_agg_exec_node::InputOrderMode::Linear(_) => InputOrderMode::Linear,
892                window_agg_exec_node::InputOrderMode::PartiallySorted(
893                    protobuf::PartiallySortedInputOrderMode { columns },
894                ) => InputOrderMode::PartiallySorted(
895                    columns.iter().map(|c| *c as usize).collect(),
896                ),
897                window_agg_exec_node::InputOrderMode::Sorted(_) => InputOrderMode::Sorted,
898            };
899
900            Ok(Arc::new(BoundedWindowAggExec::try_new(
901                physical_window_expr,
902                input,
903                input_order_mode,
904                !partition_keys.is_empty(),
905            )?))
906        } else {
907            Ok(Arc::new(WindowAggExec::try_new(
908                physical_window_expr,
909                input,
910                !partition_keys.is_empty(),
911            )?))
912        }
913    }
914
915    fn try_into_aggregate_physical_plan(
916        &self,
917        hash_agg: &protobuf::AggregateExecNode,
918        registry: &dyn FunctionRegistry,
919        runtime: &RuntimeEnv,
920        extension_codec: &dyn PhysicalExtensionCodec,
921    ) -> Result<Arc<dyn ExecutionPlan>> {
922        let input: Arc<dyn ExecutionPlan> =
923            into_physical_plan(&hash_agg.input, registry, runtime, extension_codec)?;
924        let mode = protobuf::AggregateMode::try_from(hash_agg.mode).map_err(|_| {
925            proto_error(format!(
926                "Received a AggregateNode message with unknown AggregateMode {}",
927                hash_agg.mode
928            ))
929        })?;
930        let agg_mode: AggregateMode = match mode {
931            protobuf::AggregateMode::Partial => AggregateMode::Partial,
932            protobuf::AggregateMode::Final => AggregateMode::Final,
933            protobuf::AggregateMode::FinalPartitioned => AggregateMode::FinalPartitioned,
934            protobuf::AggregateMode::Single => AggregateMode::Single,
935            protobuf::AggregateMode::SinglePartitioned => {
936                AggregateMode::SinglePartitioned
937            }
938        };
939
940        let num_expr = hash_agg.group_expr.len();
941
942        let group_expr = hash_agg
943            .group_expr
944            .iter()
945            .zip(hash_agg.group_expr_name.iter())
946            .map(|(expr, name)| {
947                parse_physical_expr(
948                    expr,
949                    registry,
950                    input.schema().as_ref(),
951                    extension_codec,
952                )
953                .map(|expr| (expr, name.to_string()))
954            })
955            .collect::<Result<Vec<_>, _>>()?;
956
957        let null_expr = hash_agg
958            .null_expr
959            .iter()
960            .zip(hash_agg.group_expr_name.iter())
961            .map(|(expr, name)| {
962                parse_physical_expr(
963                    expr,
964                    registry,
965                    input.schema().as_ref(),
966                    extension_codec,
967                )
968                .map(|expr| (expr, name.to_string()))
969            })
970            .collect::<Result<Vec<_>, _>>()?;
971
972        let groups: Vec<Vec<bool>> = if !hash_agg.groups.is_empty() {
973            hash_agg
974                .groups
975                .chunks(num_expr)
976                .map(|g| g.to_vec())
977                .collect::<Vec<Vec<bool>>>()
978        } else {
979            vec![]
980        };
981
982        let input_schema = hash_agg.input_schema.as_ref().ok_or_else(|| {
983            DataFusionError::Internal(
984                "input_schema in AggregateNode is missing.".to_owned(),
985            )
986        })?;
987        let physical_schema: SchemaRef = SchemaRef::new(input_schema.try_into()?);
988
989        let physical_filter_expr = hash_agg
990            .filter_expr
991            .iter()
992            .map(|expr| {
993                expr.expr
994                    .as_ref()
995                    .map(|e| {
996                        parse_physical_expr(
997                            e,
998                            registry,
999                            &physical_schema,
1000                            extension_codec,
1001                        )
1002                    })
1003                    .transpose()
1004            })
1005            .collect::<Result<Vec<_>, _>>()?;
1006
1007        let physical_aggr_expr: Vec<Arc<AggregateFunctionExpr>> = hash_agg
1008            .aggr_expr
1009            .iter()
1010            .zip(hash_agg.aggr_expr_name.iter())
1011            .map(|(expr, name)| {
1012                let expr_type = expr.expr_type.as_ref().ok_or_else(|| {
1013                    proto_error("Unexpected empty aggregate physical expression")
1014                })?;
1015
1016                match expr_type {
1017                    ExprType::AggregateExpr(agg_node) => {
1018                        let input_phy_expr: Vec<Arc<dyn PhysicalExpr>> = agg_node
1019                            .expr
1020                            .iter()
1021                            .map(|e| {
1022                                parse_physical_expr(
1023                                    e,
1024                                    registry,
1025                                    &physical_schema,
1026                                    extension_codec,
1027                                )
1028                            })
1029                            .collect::<Result<Vec<_>>>()?;
1030                        let ordering_req: LexOrdering = agg_node
1031                            .ordering_req
1032                            .iter()
1033                            .map(|e| {
1034                                parse_physical_sort_expr(
1035                                    e,
1036                                    registry,
1037                                    &physical_schema,
1038                                    extension_codec,
1039                                )
1040                            })
1041                            .collect::<Result<LexOrdering>>()?;
1042                        agg_node
1043                            .aggregate_function
1044                            .as_ref()
1045                            .map(|func| match func {
1046                                AggregateFunction::UserDefinedAggrFunction(udaf_name) => {
1047                                    let agg_udf = match &agg_node.fun_definition {
1048                                        Some(buf) => extension_codec
1049                                            .try_decode_udaf(udaf_name, buf)?,
1050                                        None => registry.udaf(udaf_name)?,
1051                                    };
1052
1053                                    AggregateExprBuilder::new(agg_udf, input_phy_expr)
1054                                        .schema(Arc::clone(&physical_schema))
1055                                        .alias(name)
1056                                        .with_ignore_nulls(agg_node.ignore_nulls)
1057                                        .with_distinct(agg_node.distinct)
1058                                        .order_by(ordering_req)
1059                                        .build()
1060                                        .map(Arc::new)
1061                                }
1062                            })
1063                            .transpose()?
1064                            .ok_or_else(|| {
1065                                proto_error(
1066                                    "Invalid AggregateExpr, missing aggregate_function",
1067                                )
1068                            })
1069                    }
1070                    _ => internal_err!("Invalid aggregate expression for AggregateExec"),
1071                }
1072            })
1073            .collect::<Result<Vec<_>, _>>()?;
1074
1075        let limit = hash_agg
1076            .limit
1077            .as_ref()
1078            .map(|lit_value| lit_value.limit as usize);
1079
1080        let agg = AggregateExec::try_new(
1081            agg_mode,
1082            PhysicalGroupBy::new(group_expr, null_expr, groups),
1083            physical_aggr_expr,
1084            physical_filter_expr,
1085            input,
1086            physical_schema,
1087        )?;
1088
1089        let agg = agg.with_limit(limit);
1090
1091        Ok(Arc::new(agg))
1092    }
1093
1094    fn try_into_hash_join_physical_plan(
1095        &self,
1096        hashjoin: &protobuf::HashJoinExecNode,
1097        registry: &dyn FunctionRegistry,
1098        runtime: &RuntimeEnv,
1099        extension_codec: &dyn PhysicalExtensionCodec,
1100    ) -> Result<Arc<dyn ExecutionPlan>> {
1101        let left: Arc<dyn ExecutionPlan> =
1102            into_physical_plan(&hashjoin.left, registry, runtime, extension_codec)?;
1103        let right: Arc<dyn ExecutionPlan> =
1104            into_physical_plan(&hashjoin.right, registry, runtime, extension_codec)?;
1105        let left_schema = left.schema();
1106        let right_schema = right.schema();
1107        let on: Vec<(PhysicalExprRef, PhysicalExprRef)> = hashjoin
1108            .on
1109            .iter()
1110            .map(|col| {
1111                let left = parse_physical_expr(
1112                    &col.left.clone().unwrap(),
1113                    registry,
1114                    left_schema.as_ref(),
1115                    extension_codec,
1116                )?;
1117                let right = parse_physical_expr(
1118                    &col.right.clone().unwrap(),
1119                    registry,
1120                    right_schema.as_ref(),
1121                    extension_codec,
1122                )?;
1123                Ok((left, right))
1124            })
1125            .collect::<Result<_>>()?;
1126        let join_type =
1127            protobuf::JoinType::try_from(hashjoin.join_type).map_err(|_| {
1128                proto_error(format!(
1129                    "Received a HashJoinNode message with unknown JoinType {}",
1130                    hashjoin.join_type
1131                ))
1132            })?;
1133        let filter = hashjoin
1134            .filter
1135            .as_ref()
1136            .map(|f| {
1137                let schema = f
1138                    .schema
1139                    .as_ref()
1140                    .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1141                    .try_into()?;
1142
1143                let expression = parse_physical_expr(
1144                    f.expression.as_ref().ok_or_else(|| {
1145                        proto_error("Unexpected empty filter expression")
1146                    })?,
1147                    registry, &schema,
1148                    extension_codec,
1149                )?;
1150                let column_indices = f.column_indices
1151                    .iter()
1152                    .map(|i| {
1153                        let side = protobuf::JoinSide::try_from(i.side)
1154                            .map_err(|_| proto_error(format!(
1155                                "Received a HashJoinNode message with JoinSide in Filter {}",
1156                                i.side))
1157                            )?;
1158
1159                        Ok(ColumnIndex {
1160                            index: i.index as usize,
1161                            side: side.into(),
1162                        })
1163                    })
1164                    .collect::<Result<Vec<_>>>()?;
1165
1166                Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1167            })
1168            .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1169
1170        let partition_mode = protobuf::PartitionMode::try_from(hashjoin.partition_mode)
1171            .map_err(|_| {
1172            proto_error(format!(
1173                "Received a HashJoinNode message with unknown PartitionMode {}",
1174                hashjoin.partition_mode
1175            ))
1176        })?;
1177        let partition_mode = match partition_mode {
1178            protobuf::PartitionMode::CollectLeft => PartitionMode::CollectLeft,
1179            protobuf::PartitionMode::Partitioned => PartitionMode::Partitioned,
1180            protobuf::PartitionMode::Auto => PartitionMode::Auto,
1181        };
1182        let projection = if !hashjoin.projection.is_empty() {
1183            Some(
1184                hashjoin
1185                    .projection
1186                    .iter()
1187                    .map(|i| *i as usize)
1188                    .collect::<Vec<_>>(),
1189            )
1190        } else {
1191            None
1192        };
1193        Ok(Arc::new(HashJoinExec::try_new(
1194            left,
1195            right,
1196            on,
1197            filter,
1198            &join_type.into(),
1199            projection,
1200            partition_mode,
1201            hashjoin.null_equals_null,
1202        )?))
1203    }
1204
1205    fn try_into_symmetric_hash_join_physical_plan(
1206        &self,
1207        sym_join: &protobuf::SymmetricHashJoinExecNode,
1208        registry: &dyn FunctionRegistry,
1209        runtime: &RuntimeEnv,
1210        extension_codec: &dyn PhysicalExtensionCodec,
1211    ) -> Result<Arc<dyn ExecutionPlan>> {
1212        let left =
1213            into_physical_plan(&sym_join.left, registry, runtime, extension_codec)?;
1214        let right =
1215            into_physical_plan(&sym_join.right, registry, runtime, extension_codec)?;
1216        let left_schema = left.schema();
1217        let right_schema = right.schema();
1218        let on = sym_join
1219            .on
1220            .iter()
1221            .map(|col| {
1222                let left = parse_physical_expr(
1223                    &col.left.clone().unwrap(),
1224                    registry,
1225                    left_schema.as_ref(),
1226                    extension_codec,
1227                )?;
1228                let right = parse_physical_expr(
1229                    &col.right.clone().unwrap(),
1230                    registry,
1231                    right_schema.as_ref(),
1232                    extension_codec,
1233                )?;
1234                Ok((left, right))
1235            })
1236            .collect::<Result<_>>()?;
1237        let join_type =
1238            protobuf::JoinType::try_from(sym_join.join_type).map_err(|_| {
1239                proto_error(format!(
1240                    "Received a SymmetricHashJoin message with unknown JoinType {}",
1241                    sym_join.join_type
1242                ))
1243            })?;
1244        let filter = sym_join
1245            .filter
1246            .as_ref()
1247            .map(|f| {
1248                let schema = f
1249                    .schema
1250                    .as_ref()
1251                    .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1252                    .try_into()?;
1253
1254                let expression = parse_physical_expr(
1255                    f.expression.as_ref().ok_or_else(|| {
1256                        proto_error("Unexpected empty filter expression")
1257                    })?,
1258                    registry, &schema,
1259                    extension_codec,
1260                )?;
1261                let column_indices = f.column_indices
1262                    .iter()
1263                    .map(|i| {
1264                        let side = protobuf::JoinSide::try_from(i.side)
1265                            .map_err(|_| proto_error(format!(
1266                                "Received a HashJoinNode message with JoinSide in Filter {}",
1267                                i.side))
1268                            )?;
1269
1270                        Ok(ColumnIndex {
1271                            index: i.index as usize,
1272                            side: side.into(),
1273                        })
1274                    })
1275                    .collect::<Result<_>>()?;
1276
1277                Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1278            })
1279            .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1280
1281        let left_sort_exprs = parse_physical_sort_exprs(
1282            &sym_join.left_sort_exprs,
1283            registry,
1284            &left_schema,
1285            extension_codec,
1286        )?;
1287        let left_sort_exprs = if left_sort_exprs.is_empty() {
1288            None
1289        } else {
1290            Some(left_sort_exprs)
1291        };
1292
1293        let right_sort_exprs = parse_physical_sort_exprs(
1294            &sym_join.right_sort_exprs,
1295            registry,
1296            &right_schema,
1297            extension_codec,
1298        )?;
1299        let right_sort_exprs = if right_sort_exprs.is_empty() {
1300            None
1301        } else {
1302            Some(right_sort_exprs)
1303        };
1304
1305        let partition_mode = protobuf::StreamPartitionMode::try_from(
1306            sym_join.partition_mode,
1307        )
1308        .map_err(|_| {
1309            proto_error(format!(
1310                "Received a SymmetricHashJoin message with unknown PartitionMode {}",
1311                sym_join.partition_mode
1312            ))
1313        })?;
1314        let partition_mode = match partition_mode {
1315            protobuf::StreamPartitionMode::SinglePartition => {
1316                StreamJoinPartitionMode::SinglePartition
1317            }
1318            protobuf::StreamPartitionMode::PartitionedExec => {
1319                StreamJoinPartitionMode::Partitioned
1320            }
1321        };
1322        SymmetricHashJoinExec::try_new(
1323            left,
1324            right,
1325            on,
1326            filter,
1327            &join_type.into(),
1328            sym_join.null_equals_null,
1329            left_sort_exprs,
1330            right_sort_exprs,
1331            partition_mode,
1332        )
1333        .map(|e| Arc::new(e) as _)
1334    }
1335
1336    fn try_into_union_physical_plan(
1337        &self,
1338        union: &protobuf::UnionExecNode,
1339        registry: &dyn FunctionRegistry,
1340        runtime: &RuntimeEnv,
1341        extension_codec: &dyn PhysicalExtensionCodec,
1342    ) -> Result<Arc<dyn ExecutionPlan>> {
1343        let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1344        for input in &union.inputs {
1345            inputs.push(input.try_into_physical_plan(
1346                registry,
1347                runtime,
1348                extension_codec,
1349            )?);
1350        }
1351        Ok(Arc::new(UnionExec::new(inputs)))
1352    }
1353
1354    fn try_into_interleave_physical_plan(
1355        &self,
1356        interleave: &protobuf::InterleaveExecNode,
1357        registry: &dyn FunctionRegistry,
1358        runtime: &RuntimeEnv,
1359        extension_codec: &dyn PhysicalExtensionCodec,
1360    ) -> Result<Arc<dyn ExecutionPlan>> {
1361        let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1362        for input in &interleave.inputs {
1363            inputs.push(input.try_into_physical_plan(
1364                registry,
1365                runtime,
1366                extension_codec,
1367            )?);
1368        }
1369        Ok(Arc::new(InterleaveExec::try_new(inputs)?))
1370    }
1371
1372    fn try_into_cross_join_physical_plan(
1373        &self,
1374        crossjoin: &protobuf::CrossJoinExecNode,
1375        registry: &dyn FunctionRegistry,
1376        runtime: &RuntimeEnv,
1377        extension_codec: &dyn PhysicalExtensionCodec,
1378    ) -> Result<Arc<dyn ExecutionPlan>> {
1379        let left: Arc<dyn ExecutionPlan> =
1380            into_physical_plan(&crossjoin.left, registry, runtime, extension_codec)?;
1381        let right: Arc<dyn ExecutionPlan> =
1382            into_physical_plan(&crossjoin.right, registry, runtime, extension_codec)?;
1383        Ok(Arc::new(CrossJoinExec::new(left, right)))
1384    }
1385
1386    fn try_into_empty_physical_plan(
1387        &self,
1388        empty: &protobuf::EmptyExecNode,
1389        _registry: &dyn FunctionRegistry,
1390        _runtime: &RuntimeEnv,
1391        _extension_codec: &dyn PhysicalExtensionCodec,
1392    ) -> Result<Arc<dyn ExecutionPlan>> {
1393        let schema = Arc::new(convert_required!(empty.schema)?);
1394        Ok(Arc::new(EmptyExec::new(schema)))
1395    }
1396
1397    fn try_into_placeholder_row_physical_plan(
1398        &self,
1399        placeholder: &protobuf::PlaceholderRowExecNode,
1400        _registry: &dyn FunctionRegistry,
1401        _runtime: &RuntimeEnv,
1402        _extension_codec: &dyn PhysicalExtensionCodec,
1403    ) -> Result<Arc<dyn ExecutionPlan>> {
1404        let schema = Arc::new(convert_required!(placeholder.schema)?);
1405        Ok(Arc::new(PlaceholderRowExec::new(schema)))
1406    }
1407
1408    fn try_into_sort_physical_plan(
1409        &self,
1410        sort: &protobuf::SortExecNode,
1411        registry: &dyn FunctionRegistry,
1412        runtime: &RuntimeEnv,
1413        extension_codec: &dyn PhysicalExtensionCodec,
1414    ) -> Result<Arc<dyn ExecutionPlan>> {
1415        let input: Arc<dyn ExecutionPlan> =
1416            into_physical_plan(&sort.input, registry, runtime, extension_codec)?;
1417        let exprs = sort
1418                    .expr
1419                    .iter()
1420                    .map(|expr| {
1421                        let expr = expr.expr_type.as_ref().ok_or_else(|| {
1422                            proto_error(format!(
1423                                "physical_plan::from_proto() Unexpected expr {self:?}"
1424                            ))
1425                        })?;
1426                        if let ExprType::Sort(sort_expr) = expr {
1427                            let expr = sort_expr
1428                                .expr
1429                                .as_ref()
1430                                .ok_or_else(|| {
1431                                    proto_error(format!(
1432                                        "physical_plan::from_proto() Unexpected sort expr {self:?}"
1433                                    ))
1434                                })?
1435                                .as_ref();
1436                            Ok(PhysicalSortExpr {
1437                                expr: parse_physical_expr(expr, registry, input.schema().as_ref(), extension_codec)?,
1438                                options: SortOptions {
1439                                    descending: !sort_expr.asc,
1440                                    nulls_first: sort_expr.nulls_first,
1441                                },
1442                            })
1443                        } else {
1444                            internal_err!(
1445                                "physical_plan::from_proto() {self:?}"
1446                            )
1447                        }
1448                    })
1449                    .collect::<Result<LexOrdering, _>>()?;
1450        let fetch = if sort.fetch < 0 {
1451            None
1452        } else {
1453            Some(sort.fetch as usize)
1454        };
1455        let new_sort = SortExec::new(exprs, input)
1456            .with_fetch(fetch)
1457            .with_preserve_partitioning(sort.preserve_partitioning);
1458
1459        Ok(Arc::new(new_sort))
1460    }
1461
1462    fn try_into_sort_preserving_merge_physical_plan(
1463        &self,
1464        sort: &protobuf::SortPreservingMergeExecNode,
1465        registry: &dyn FunctionRegistry,
1466        runtime: &RuntimeEnv,
1467        extension_codec: &dyn PhysicalExtensionCodec,
1468    ) -> Result<Arc<dyn ExecutionPlan>> {
1469        let input: Arc<dyn ExecutionPlan> =
1470            into_physical_plan(&sort.input, registry, runtime, extension_codec)?;
1471        let exprs = sort
1472            .expr
1473            .iter()
1474            .map(|expr| {
1475                let expr = expr.expr_type.as_ref().ok_or_else(|| {
1476                    proto_error(format!(
1477                        "physical_plan::from_proto() Unexpected expr {self:?}"
1478                    ))
1479                })?;
1480                if let ExprType::Sort(sort_expr) = expr {
1481                    let expr = sort_expr
1482                        .expr
1483                        .as_ref()
1484                        .ok_or_else(|| {
1485                            proto_error(format!(
1486                            "physical_plan::from_proto() Unexpected sort expr {self:?}"
1487                        ))
1488                        })?
1489                        .as_ref();
1490                    Ok(PhysicalSortExpr {
1491                        expr: parse_physical_expr(
1492                            expr,
1493                            registry,
1494                            input.schema().as_ref(),
1495                            extension_codec,
1496                        )?,
1497                        options: SortOptions {
1498                            descending: !sort_expr.asc,
1499                            nulls_first: sort_expr.nulls_first,
1500                        },
1501                    })
1502                } else {
1503                    internal_err!("physical_plan::from_proto() {self:?}")
1504                }
1505            })
1506            .collect::<Result<LexOrdering, _>>()?;
1507        let fetch = if sort.fetch < 0 {
1508            None
1509        } else {
1510            Some(sort.fetch as usize)
1511        };
1512        Ok(Arc::new(
1513            SortPreservingMergeExec::new(exprs, input).with_fetch(fetch),
1514        ))
1515    }
1516
1517    fn try_into_extension_physical_plan(
1518        &self,
1519        extension: &protobuf::PhysicalExtensionNode,
1520        registry: &dyn FunctionRegistry,
1521        runtime: &RuntimeEnv,
1522        extension_codec: &dyn PhysicalExtensionCodec,
1523    ) -> Result<Arc<dyn ExecutionPlan>> {
1524        let inputs: Vec<Arc<dyn ExecutionPlan>> = extension
1525            .inputs
1526            .iter()
1527            .map(|i| i.try_into_physical_plan(registry, runtime, extension_codec))
1528            .collect::<Result<_>>()?;
1529
1530        let extension_node =
1531            extension_codec.try_decode(extension.node.as_slice(), &inputs, registry)?;
1532
1533        Ok(extension_node)
1534    }
1535
1536    fn try_into_nested_loop_join_physical_plan(
1537        &self,
1538        join: &protobuf::NestedLoopJoinExecNode,
1539        registry: &dyn FunctionRegistry,
1540        runtime: &RuntimeEnv,
1541        extension_codec: &dyn PhysicalExtensionCodec,
1542    ) -> Result<Arc<dyn ExecutionPlan>> {
1543        let left: Arc<dyn ExecutionPlan> =
1544            into_physical_plan(&join.left, registry, runtime, extension_codec)?;
1545        let right: Arc<dyn ExecutionPlan> =
1546            into_physical_plan(&join.right, registry, runtime, extension_codec)?;
1547        let join_type = protobuf::JoinType::try_from(join.join_type).map_err(|_| {
1548            proto_error(format!(
1549                "Received a NestedLoopJoinExecNode message with unknown JoinType {}",
1550                join.join_type
1551            ))
1552        })?;
1553        let filter = join
1554                    .filter
1555                    .as_ref()
1556                    .map(|f| {
1557                        let schema = f
1558                            .schema
1559                            .as_ref()
1560                            .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1561                            .try_into()?;
1562
1563                        let expression = parse_physical_expr(
1564                            f.expression.as_ref().ok_or_else(|| {
1565                                proto_error("Unexpected empty filter expression")
1566                            })?,
1567                            registry, &schema,
1568                            extension_codec,
1569                        )?;
1570                        let column_indices = f.column_indices
1571                            .iter()
1572                            .map(|i| {
1573                                let side = protobuf::JoinSide::try_from(i.side)
1574                                    .map_err(|_| proto_error(format!(
1575                                        "Received a NestedLoopJoinExecNode message with JoinSide in Filter {}",
1576                                        i.side))
1577                                    )?;
1578
1579                                Ok(ColumnIndex {
1580                                    index: i.index as usize,
1581                                    side: side.into(),
1582                                })
1583                            })
1584                            .collect::<Result<Vec<_>>>()?;
1585
1586                        Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1587                    })
1588                    .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1589
1590        let projection = if !join.projection.is_empty() {
1591            Some(
1592                join.projection
1593                    .iter()
1594                    .map(|i| *i as usize)
1595                    .collect::<Vec<_>>(),
1596            )
1597        } else {
1598            None
1599        };
1600
1601        Ok(Arc::new(NestedLoopJoinExec::try_new(
1602            left,
1603            right,
1604            filter,
1605            &join_type.into(),
1606            projection,
1607        )?))
1608    }
1609
1610    fn try_into_analyze_physical_plan(
1611        &self,
1612        analyze: &protobuf::AnalyzeExecNode,
1613        registry: &dyn FunctionRegistry,
1614        runtime: &RuntimeEnv,
1615        extension_codec: &dyn PhysicalExtensionCodec,
1616    ) -> Result<Arc<dyn ExecutionPlan>> {
1617        let input: Arc<dyn ExecutionPlan> =
1618            into_physical_plan(&analyze.input, registry, runtime, extension_codec)?;
1619        Ok(Arc::new(AnalyzeExec::new(
1620            analyze.verbose,
1621            analyze.show_statistics,
1622            input,
1623            Arc::new(convert_required!(analyze.schema)?),
1624        )))
1625    }
1626
1627    fn try_into_json_sink_physical_plan(
1628        &self,
1629        sink: &protobuf::JsonSinkExecNode,
1630        registry: &dyn FunctionRegistry,
1631        runtime: &RuntimeEnv,
1632        extension_codec: &dyn PhysicalExtensionCodec,
1633    ) -> Result<Arc<dyn ExecutionPlan>> {
1634        let input = into_physical_plan(&sink.input, registry, runtime, extension_codec)?;
1635
1636        let data_sink: JsonSink = sink
1637            .sink
1638            .as_ref()
1639            .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1640            .try_into()?;
1641        let sink_schema = input.schema();
1642        let sort_order = sink
1643            .sort_order
1644            .as_ref()
1645            .map(|collection| {
1646                parse_physical_sort_exprs(
1647                    &collection.physical_sort_expr_nodes,
1648                    registry,
1649                    &sink_schema,
1650                    extension_codec,
1651                )
1652                .map(LexRequirement::from)
1653            })
1654            .transpose()?;
1655        Ok(Arc::new(DataSinkExec::new(
1656            input,
1657            Arc::new(data_sink),
1658            sort_order,
1659        )))
1660    }
1661
1662    fn try_into_csv_sink_physical_plan(
1663        &self,
1664        sink: &protobuf::CsvSinkExecNode,
1665        registry: &dyn FunctionRegistry,
1666        runtime: &RuntimeEnv,
1667        extension_codec: &dyn PhysicalExtensionCodec,
1668    ) -> Result<Arc<dyn ExecutionPlan>> {
1669        let input = into_physical_plan(&sink.input, registry, runtime, extension_codec)?;
1670
1671        let data_sink: CsvSink = sink
1672            .sink
1673            .as_ref()
1674            .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1675            .try_into()?;
1676        let sink_schema = input.schema();
1677        let sort_order = sink
1678            .sort_order
1679            .as_ref()
1680            .map(|collection| {
1681                parse_physical_sort_exprs(
1682                    &collection.physical_sort_expr_nodes,
1683                    registry,
1684                    &sink_schema,
1685                    extension_codec,
1686                )
1687                .map(LexRequirement::from)
1688            })
1689            .transpose()?;
1690        Ok(Arc::new(DataSinkExec::new(
1691            input,
1692            Arc::new(data_sink),
1693            sort_order,
1694        )))
1695    }
1696
1697    fn try_into_parquet_sink_physical_plan(
1698        &self,
1699        sink: &protobuf::ParquetSinkExecNode,
1700        registry: &dyn FunctionRegistry,
1701        runtime: &RuntimeEnv,
1702        extension_codec: &dyn PhysicalExtensionCodec,
1703    ) -> Result<Arc<dyn ExecutionPlan>> {
1704        #[cfg(feature = "parquet")]
1705        {
1706            let input =
1707                into_physical_plan(&sink.input, registry, runtime, extension_codec)?;
1708
1709            let data_sink: ParquetSink = sink
1710                .sink
1711                .as_ref()
1712                .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1713                .try_into()?;
1714            let sink_schema = input.schema();
1715            let sort_order = sink
1716                .sort_order
1717                .as_ref()
1718                .map(|collection| {
1719                    parse_physical_sort_exprs(
1720                        &collection.physical_sort_expr_nodes,
1721                        registry,
1722                        &sink_schema,
1723                        extension_codec,
1724                    )
1725                    .map(LexRequirement::from)
1726                })
1727                .transpose()?;
1728            Ok(Arc::new(DataSinkExec::new(
1729                input,
1730                Arc::new(data_sink),
1731                sort_order,
1732            )))
1733        }
1734        #[cfg(not(feature = "parquet"))]
1735        panic!("Trying to use ParquetSink without `parquet` feature enabled");
1736    }
1737
1738    fn try_into_unnest_physical_plan(
1739        &self,
1740        unnest: &protobuf::UnnestExecNode,
1741        registry: &dyn FunctionRegistry,
1742        runtime: &RuntimeEnv,
1743        extension_codec: &dyn PhysicalExtensionCodec,
1744    ) -> Result<Arc<dyn ExecutionPlan>> {
1745        let input =
1746            into_physical_plan(&unnest.input, registry, runtime, extension_codec)?;
1747
1748        Ok(Arc::new(UnnestExec::new(
1749            input,
1750            unnest
1751                .list_type_columns
1752                .iter()
1753                .map(|c| ListUnnest {
1754                    index_in_input_schema: c.index_in_input_schema as _,
1755                    depth: c.depth as _,
1756                })
1757                .collect(),
1758            unnest.struct_type_columns.iter().map(|c| *c as _).collect(),
1759            Arc::new(convert_required!(unnest.schema)?),
1760            into_required!(unnest.options)?,
1761        )))
1762    }
1763
1764    fn try_from_explain_exec(
1765        exec: &ExplainExec,
1766        _extension_codec: &dyn PhysicalExtensionCodec,
1767    ) -> Result<Self> {
1768        Ok(protobuf::PhysicalPlanNode {
1769            physical_plan_type: Some(PhysicalPlanType::Explain(
1770                protobuf::ExplainExecNode {
1771                    schema: Some(exec.schema().as_ref().try_into()?),
1772                    stringified_plans: exec
1773                        .stringified_plans()
1774                        .iter()
1775                        .map(|plan| plan.into())
1776                        .collect(),
1777                    verbose: exec.verbose(),
1778                },
1779            )),
1780        })
1781    }
1782
1783    fn try_from_projection_exec(
1784        exec: &ProjectionExec,
1785        extension_codec: &dyn PhysicalExtensionCodec,
1786    ) -> Result<Self> {
1787        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1788            exec.input().to_owned(),
1789            extension_codec,
1790        )?;
1791        let expr = exec
1792            .expr()
1793            .iter()
1794            .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
1795            .collect::<Result<Vec<_>>>()?;
1796        let expr_name = exec.expr().iter().map(|expr| expr.1.clone()).collect();
1797        Ok(protobuf::PhysicalPlanNode {
1798            physical_plan_type: Some(PhysicalPlanType::Projection(Box::new(
1799                protobuf::ProjectionExecNode {
1800                    input: Some(Box::new(input)),
1801                    expr,
1802                    expr_name,
1803                },
1804            ))),
1805        })
1806    }
1807
1808    fn try_from_analyze_exec(
1809        exec: &AnalyzeExec,
1810        extension_codec: &dyn PhysicalExtensionCodec,
1811    ) -> Result<Self> {
1812        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1813            exec.input().to_owned(),
1814            extension_codec,
1815        )?;
1816        Ok(protobuf::PhysicalPlanNode {
1817            physical_plan_type: Some(PhysicalPlanType::Analyze(Box::new(
1818                protobuf::AnalyzeExecNode {
1819                    verbose: exec.verbose(),
1820                    show_statistics: exec.show_statistics(),
1821                    input: Some(Box::new(input)),
1822                    schema: Some(exec.schema().as_ref().try_into()?),
1823                },
1824            ))),
1825        })
1826    }
1827
1828    fn try_from_filter_exec(
1829        exec: &FilterExec,
1830        extension_codec: &dyn PhysicalExtensionCodec,
1831    ) -> Result<Self> {
1832        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1833            exec.input().to_owned(),
1834            extension_codec,
1835        )?;
1836        Ok(protobuf::PhysicalPlanNode {
1837            physical_plan_type: Some(PhysicalPlanType::Filter(Box::new(
1838                protobuf::FilterExecNode {
1839                    input: Some(Box::new(input)),
1840                    expr: Some(serialize_physical_expr(
1841                        exec.predicate(),
1842                        extension_codec,
1843                    )?),
1844                    default_filter_selectivity: exec.default_selectivity() as u32,
1845                    projection: exec.projection().as_ref().map_or_else(Vec::new, |v| {
1846                        v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
1847                    }),
1848                },
1849            ))),
1850        })
1851    }
1852
1853    fn try_from_global_limit_exec(
1854        limit: &GlobalLimitExec,
1855        extension_codec: &dyn PhysicalExtensionCodec,
1856    ) -> Result<Self> {
1857        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1858            limit.input().to_owned(),
1859            extension_codec,
1860        )?;
1861
1862        Ok(protobuf::PhysicalPlanNode {
1863            physical_plan_type: Some(PhysicalPlanType::GlobalLimit(Box::new(
1864                protobuf::GlobalLimitExecNode {
1865                    input: Some(Box::new(input)),
1866                    skip: limit.skip() as u32,
1867                    fetch: match limit.fetch() {
1868                        Some(n) => n as i64,
1869                        _ => -1, // no limit
1870                    },
1871                },
1872            ))),
1873        })
1874    }
1875
1876    fn try_from_local_limit_exec(
1877        limit: &LocalLimitExec,
1878        extension_codec: &dyn PhysicalExtensionCodec,
1879    ) -> Result<Self> {
1880        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1881            limit.input().to_owned(),
1882            extension_codec,
1883        )?;
1884        Ok(protobuf::PhysicalPlanNode {
1885            physical_plan_type: Some(PhysicalPlanType::LocalLimit(Box::new(
1886                protobuf::LocalLimitExecNode {
1887                    input: Some(Box::new(input)),
1888                    fetch: limit.fetch() as u32,
1889                },
1890            ))),
1891        })
1892    }
1893
1894    fn try_from_hash_join_exec(
1895        exec: &HashJoinExec,
1896        extension_codec: &dyn PhysicalExtensionCodec,
1897    ) -> Result<Self> {
1898        let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
1899            exec.left().to_owned(),
1900            extension_codec,
1901        )?;
1902        let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
1903            exec.right().to_owned(),
1904            extension_codec,
1905        )?;
1906        let on: Vec<protobuf::JoinOn> = exec
1907            .on()
1908            .iter()
1909            .map(|tuple| {
1910                let l = serialize_physical_expr(&tuple.0, extension_codec)?;
1911                let r = serialize_physical_expr(&tuple.1, extension_codec)?;
1912                Ok::<_, DataFusionError>(protobuf::JoinOn {
1913                    left: Some(l),
1914                    right: Some(r),
1915                })
1916            })
1917            .collect::<Result<_>>()?;
1918        let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
1919        let filter = exec
1920            .filter()
1921            .as_ref()
1922            .map(|f| {
1923                let expression =
1924                    serialize_physical_expr(f.expression(), extension_codec)?;
1925                let column_indices = f
1926                    .column_indices()
1927                    .iter()
1928                    .map(|i| {
1929                        let side: protobuf::JoinSide = i.side.to_owned().into();
1930                        protobuf::ColumnIndex {
1931                            index: i.index as u32,
1932                            side: side.into(),
1933                        }
1934                    })
1935                    .collect();
1936                let schema = f.schema().as_ref().try_into()?;
1937                Ok(protobuf::JoinFilter {
1938                    expression: Some(expression),
1939                    column_indices,
1940                    schema: Some(schema),
1941                })
1942            })
1943            .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
1944
1945        let partition_mode = match exec.partition_mode() {
1946            PartitionMode::CollectLeft => protobuf::PartitionMode::CollectLeft,
1947            PartitionMode::Partitioned => protobuf::PartitionMode::Partitioned,
1948            PartitionMode::Auto => protobuf::PartitionMode::Auto,
1949        };
1950
1951        Ok(protobuf::PhysicalPlanNode {
1952            physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new(
1953                protobuf::HashJoinExecNode {
1954                    left: Some(Box::new(left)),
1955                    right: Some(Box::new(right)),
1956                    on,
1957                    join_type: join_type.into(),
1958                    partition_mode: partition_mode.into(),
1959                    null_equals_null: exec.null_equals_null(),
1960                    filter,
1961                    projection: exec.projection.as_ref().map_or_else(Vec::new, |v| {
1962                        v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
1963                    }),
1964                },
1965            ))),
1966        })
1967    }
1968
1969    fn try_from_symmetric_hash_join_exec(
1970        exec: &SymmetricHashJoinExec,
1971        extension_codec: &dyn PhysicalExtensionCodec,
1972    ) -> Result<Self> {
1973        let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
1974            exec.left().to_owned(),
1975            extension_codec,
1976        )?;
1977        let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
1978            exec.right().to_owned(),
1979            extension_codec,
1980        )?;
1981        let on = exec
1982            .on()
1983            .iter()
1984            .map(|tuple| {
1985                let l = serialize_physical_expr(&tuple.0, extension_codec)?;
1986                let r = serialize_physical_expr(&tuple.1, extension_codec)?;
1987                Ok::<_, DataFusionError>(protobuf::JoinOn {
1988                    left: Some(l),
1989                    right: Some(r),
1990                })
1991            })
1992            .collect::<Result<_>>()?;
1993        let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
1994        let filter = exec
1995            .filter()
1996            .as_ref()
1997            .map(|f| {
1998                let expression =
1999                    serialize_physical_expr(f.expression(), extension_codec)?;
2000                let column_indices = f
2001                    .column_indices()
2002                    .iter()
2003                    .map(|i| {
2004                        let side: protobuf::JoinSide = i.side.to_owned().into();
2005                        protobuf::ColumnIndex {
2006                            index: i.index as u32,
2007                            side: side.into(),
2008                        }
2009                    })
2010                    .collect();
2011                let schema = f.schema().as_ref().try_into()?;
2012                Ok(protobuf::JoinFilter {
2013                    expression: Some(expression),
2014                    column_indices,
2015                    schema: Some(schema),
2016                })
2017            })
2018            .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2019
2020        let partition_mode = match exec.partition_mode() {
2021            StreamJoinPartitionMode::SinglePartition => {
2022                protobuf::StreamPartitionMode::SinglePartition
2023            }
2024            StreamJoinPartitionMode::Partitioned => {
2025                protobuf::StreamPartitionMode::PartitionedExec
2026            }
2027        };
2028
2029        let left_sort_exprs = exec
2030            .left_sort_exprs()
2031            .map(|exprs| {
2032                exprs
2033                    .iter()
2034                    .map(|expr| {
2035                        Ok(protobuf::PhysicalSortExprNode {
2036                            expr: Some(Box::new(serialize_physical_expr(
2037                                &expr.expr,
2038                                extension_codec,
2039                            )?)),
2040                            asc: !expr.options.descending,
2041                            nulls_first: expr.options.nulls_first,
2042                        })
2043                    })
2044                    .collect::<Result<Vec<_>>>()
2045            })
2046            .transpose()?
2047            .unwrap_or(vec![]);
2048
2049        let right_sort_exprs = exec
2050            .right_sort_exprs()
2051            .map(|exprs| {
2052                exprs
2053                    .iter()
2054                    .map(|expr| {
2055                        Ok(protobuf::PhysicalSortExprNode {
2056                            expr: Some(Box::new(serialize_physical_expr(
2057                                &expr.expr,
2058                                extension_codec,
2059                            )?)),
2060                            asc: !expr.options.descending,
2061                            nulls_first: expr.options.nulls_first,
2062                        })
2063                    })
2064                    .collect::<Result<Vec<_>>>()
2065            })
2066            .transpose()?
2067            .unwrap_or(vec![]);
2068
2069        Ok(protobuf::PhysicalPlanNode {
2070            physical_plan_type: Some(PhysicalPlanType::SymmetricHashJoin(Box::new(
2071                protobuf::SymmetricHashJoinExecNode {
2072                    left: Some(Box::new(left)),
2073                    right: Some(Box::new(right)),
2074                    on,
2075                    join_type: join_type.into(),
2076                    partition_mode: partition_mode.into(),
2077                    null_equals_null: exec.null_equals_null(),
2078                    left_sort_exprs,
2079                    right_sort_exprs,
2080                    filter,
2081                },
2082            ))),
2083        })
2084    }
2085
2086    fn try_from_cross_join_exec(
2087        exec: &CrossJoinExec,
2088        extension_codec: &dyn PhysicalExtensionCodec,
2089    ) -> Result<Self> {
2090        let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2091            exec.left().to_owned(),
2092            extension_codec,
2093        )?;
2094        let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2095            exec.right().to_owned(),
2096            extension_codec,
2097        )?;
2098        Ok(protobuf::PhysicalPlanNode {
2099            physical_plan_type: Some(PhysicalPlanType::CrossJoin(Box::new(
2100                protobuf::CrossJoinExecNode {
2101                    left: Some(Box::new(left)),
2102                    right: Some(Box::new(right)),
2103                },
2104            ))),
2105        })
2106    }
2107
2108    fn try_from_aggregate_exec(
2109        exec: &AggregateExec,
2110        extension_codec: &dyn PhysicalExtensionCodec,
2111    ) -> Result<Self> {
2112        let groups: Vec<bool> = exec
2113            .group_expr()
2114            .groups()
2115            .iter()
2116            .flatten()
2117            .copied()
2118            .collect();
2119
2120        let group_names = exec
2121            .group_expr()
2122            .expr()
2123            .iter()
2124            .map(|expr| expr.1.to_owned())
2125            .collect();
2126
2127        let filter = exec
2128            .filter_expr()
2129            .iter()
2130            .map(|expr| serialize_maybe_filter(expr.to_owned(), extension_codec))
2131            .collect::<Result<Vec<_>>>()?;
2132
2133        let agg = exec
2134            .aggr_expr()
2135            .iter()
2136            .map(|expr| serialize_physical_aggr_expr(expr.to_owned(), extension_codec))
2137            .collect::<Result<Vec<_>>>()?;
2138
2139        let agg_names = exec
2140            .aggr_expr()
2141            .iter()
2142            .map(|expr| expr.name().to_string())
2143            .collect::<Vec<_>>();
2144
2145        let agg_mode = match exec.mode() {
2146            AggregateMode::Partial => protobuf::AggregateMode::Partial,
2147            AggregateMode::Final => protobuf::AggregateMode::Final,
2148            AggregateMode::FinalPartitioned => protobuf::AggregateMode::FinalPartitioned,
2149            AggregateMode::Single => protobuf::AggregateMode::Single,
2150            AggregateMode::SinglePartitioned => {
2151                protobuf::AggregateMode::SinglePartitioned
2152            }
2153        };
2154        let input_schema = exec.input_schema();
2155        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2156            exec.input().to_owned(),
2157            extension_codec,
2158        )?;
2159
2160        let null_expr = exec
2161            .group_expr()
2162            .null_expr()
2163            .iter()
2164            .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
2165            .collect::<Result<Vec<_>>>()?;
2166
2167        let group_expr = exec
2168            .group_expr()
2169            .expr()
2170            .iter()
2171            .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
2172            .collect::<Result<Vec<_>>>()?;
2173
2174        let limit = exec.limit().map(|value| protobuf::AggLimit {
2175            limit: value as u64,
2176        });
2177
2178        Ok(protobuf::PhysicalPlanNode {
2179            physical_plan_type: Some(PhysicalPlanType::Aggregate(Box::new(
2180                protobuf::AggregateExecNode {
2181                    group_expr,
2182                    group_expr_name: group_names,
2183                    aggr_expr: agg,
2184                    filter_expr: filter,
2185                    aggr_expr_name: agg_names,
2186                    mode: agg_mode as i32,
2187                    input: Some(Box::new(input)),
2188                    input_schema: Some(input_schema.as_ref().try_into()?),
2189                    null_expr,
2190                    groups,
2191                    limit,
2192                },
2193            ))),
2194        })
2195    }
2196
2197    fn try_from_empty_exec(
2198        empty: &EmptyExec,
2199        _extension_codec: &dyn PhysicalExtensionCodec,
2200    ) -> Result<Self> {
2201        let schema = empty.schema().as_ref().try_into()?;
2202        Ok(protobuf::PhysicalPlanNode {
2203            physical_plan_type: Some(PhysicalPlanType::Empty(protobuf::EmptyExecNode {
2204                schema: Some(schema),
2205            })),
2206        })
2207    }
2208
2209    fn try_from_placeholder_row_exec(
2210        empty: &PlaceholderRowExec,
2211        _extension_codec: &dyn PhysicalExtensionCodec,
2212    ) -> Result<Self> {
2213        let schema = empty.schema().as_ref().try_into()?;
2214        Ok(protobuf::PhysicalPlanNode {
2215            physical_plan_type: Some(PhysicalPlanType::PlaceholderRow(
2216                protobuf::PlaceholderRowExecNode {
2217                    schema: Some(schema),
2218                },
2219            )),
2220        })
2221    }
2222
2223    fn try_from_coalesce_batches_exec(
2224        coalesce_batches: &CoalesceBatchesExec,
2225        extension_codec: &dyn PhysicalExtensionCodec,
2226    ) -> Result<Self> {
2227        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2228            coalesce_batches.input().to_owned(),
2229            extension_codec,
2230        )?;
2231        Ok(protobuf::PhysicalPlanNode {
2232            physical_plan_type: Some(PhysicalPlanType::CoalesceBatches(Box::new(
2233                protobuf::CoalesceBatchesExecNode {
2234                    input: Some(Box::new(input)),
2235                    target_batch_size: coalesce_batches.target_batch_size() as u32,
2236                    fetch: coalesce_batches.fetch().map(|n| n as u32),
2237                },
2238            ))),
2239        })
2240    }
2241
2242    fn try_from_data_source_exec(
2243        data_source_exec: &DataSourceExec,
2244        extension_codec: &dyn PhysicalExtensionCodec,
2245    ) -> Result<Option<Self>> {
2246        let data_source = data_source_exec.data_source();
2247        if let Some(maybe_csv) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2248            let source = maybe_csv.file_source();
2249            if let Some(csv_config) = source.as_any().downcast_ref::<CsvSource>() {
2250                return Ok(Some(protobuf::PhysicalPlanNode {
2251                    physical_plan_type: Some(PhysicalPlanType::CsvScan(
2252                        protobuf::CsvScanExecNode {
2253                            base_conf: Some(serialize_file_scan_config(
2254                                maybe_csv,
2255                                extension_codec,
2256                            )?),
2257                            has_header: csv_config.has_header(),
2258                            delimiter: byte_to_string(
2259                                csv_config.delimiter(),
2260                                "delimiter",
2261                            )?,
2262                            quote: byte_to_string(csv_config.quote(), "quote")?,
2263                            optional_escape: if let Some(escape) = csv_config.escape() {
2264                                Some(
2265                                    protobuf::csv_scan_exec_node::OptionalEscape::Escape(
2266                                        byte_to_string(escape, "escape")?,
2267                                    ),
2268                                )
2269                            } else {
2270                                None
2271                            },
2272                            optional_comment: if let Some(comment) = csv_config.comment()
2273                            {
2274                                Some(protobuf::csv_scan_exec_node::OptionalComment::Comment(
2275                                        byte_to_string(comment, "comment")?,
2276                                    ))
2277                            } else {
2278                                None
2279                            },
2280                            newlines_in_values: maybe_csv.newlines_in_values(),
2281                        },
2282                    )),
2283                }));
2284            }
2285        }
2286
2287        if let Some(scan_conf) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2288            let source = scan_conf.file_source();
2289            if let Some(_json_source) = source.as_any().downcast_ref::<JsonSource>() {
2290                return Ok(Some(protobuf::PhysicalPlanNode {
2291                    physical_plan_type: Some(PhysicalPlanType::JsonScan(
2292                        protobuf::JsonScanExecNode {
2293                            base_conf: Some(serialize_file_scan_config(
2294                                scan_conf,
2295                                extension_codec,
2296                            )?),
2297                        },
2298                    )),
2299                }));
2300            }
2301        }
2302
2303        #[cfg(feature = "parquet")]
2304        if let Some((maybe_parquet, conf)) =
2305            data_source_exec.downcast_to_file_source::<ParquetSource>()
2306        {
2307            let predicate = conf
2308                .predicate()
2309                .map(|pred| serialize_physical_expr(pred, extension_codec))
2310                .transpose()?;
2311            return Ok(Some(protobuf::PhysicalPlanNode {
2312                physical_plan_type: Some(PhysicalPlanType::ParquetScan(
2313                    protobuf::ParquetScanExecNode {
2314                        base_conf: Some(serialize_file_scan_config(
2315                            maybe_parquet,
2316                            extension_codec,
2317                        )?),
2318                        predicate,
2319                        parquet_options: Some(conf.table_parquet_options().try_into()?),
2320                    },
2321                )),
2322            }));
2323        }
2324
2325        #[cfg(feature = "avro")]
2326        if let Some(maybe_avro) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2327            let source = maybe_avro.file_source();
2328            if source.as_any().downcast_ref::<AvroSource>().is_some() {
2329                return Ok(Some(protobuf::PhysicalPlanNode {
2330                    physical_plan_type: Some(PhysicalPlanType::AvroScan(
2331                        protobuf::AvroScanExecNode {
2332                            base_conf: Some(serialize_file_scan_config(
2333                                maybe_avro,
2334                                extension_codec,
2335                            )?),
2336                        },
2337                    )),
2338                }));
2339            }
2340        }
2341
2342        Ok(None)
2343    }
2344
2345    fn try_from_coalesce_partitions_exec(
2346        exec: &CoalescePartitionsExec,
2347        extension_codec: &dyn PhysicalExtensionCodec,
2348    ) -> Result<Self> {
2349        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2350            exec.input().to_owned(),
2351            extension_codec,
2352        )?;
2353        Ok(protobuf::PhysicalPlanNode {
2354            physical_plan_type: Some(PhysicalPlanType::Merge(Box::new(
2355                protobuf::CoalescePartitionsExecNode {
2356                    input: Some(Box::new(input)),
2357                },
2358            ))),
2359        })
2360    }
2361
2362    fn try_from_repartition_exec(
2363        exec: &RepartitionExec,
2364        extension_codec: &dyn PhysicalExtensionCodec,
2365    ) -> Result<Self> {
2366        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2367            exec.input().to_owned(),
2368            extension_codec,
2369        )?;
2370
2371        let pb_partitioning =
2372            serialize_partitioning(exec.partitioning(), extension_codec)?;
2373
2374        Ok(protobuf::PhysicalPlanNode {
2375            physical_plan_type: Some(PhysicalPlanType::Repartition(Box::new(
2376                protobuf::RepartitionExecNode {
2377                    input: Some(Box::new(input)),
2378                    partitioning: Some(pb_partitioning),
2379                },
2380            ))),
2381        })
2382    }
2383
2384    fn try_from_sort_exec(
2385        exec: &SortExec,
2386        extension_codec: &dyn PhysicalExtensionCodec,
2387    ) -> Result<Self> {
2388        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2389            exec.input().to_owned(),
2390            extension_codec,
2391        )?;
2392        let expr = exec
2393            .expr()
2394            .iter()
2395            .map(|expr| {
2396                let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
2397                    expr: Some(Box::new(serialize_physical_expr(
2398                        &expr.expr,
2399                        extension_codec,
2400                    )?)),
2401                    asc: !expr.options.descending,
2402                    nulls_first: expr.options.nulls_first,
2403                });
2404                Ok(protobuf::PhysicalExprNode {
2405                    expr_type: Some(ExprType::Sort(sort_expr)),
2406                })
2407            })
2408            .collect::<Result<Vec<_>>>()?;
2409        Ok(protobuf::PhysicalPlanNode {
2410            physical_plan_type: Some(PhysicalPlanType::Sort(Box::new(
2411                protobuf::SortExecNode {
2412                    input: Some(Box::new(input)),
2413                    expr,
2414                    fetch: match exec.fetch() {
2415                        Some(n) => n as i64,
2416                        _ => -1,
2417                    },
2418                    preserve_partitioning: exec.preserve_partitioning(),
2419                },
2420            ))),
2421        })
2422    }
2423
2424    fn try_from_union_exec(
2425        union: &UnionExec,
2426        extension_codec: &dyn PhysicalExtensionCodec,
2427    ) -> Result<Self> {
2428        let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
2429        for input in union.inputs() {
2430            inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
2431                input.to_owned(),
2432                extension_codec,
2433            )?);
2434        }
2435        Ok(protobuf::PhysicalPlanNode {
2436            physical_plan_type: Some(PhysicalPlanType::Union(protobuf::UnionExecNode {
2437                inputs,
2438            })),
2439        })
2440    }
2441
2442    fn try_from_interleave_exec(
2443        interleave: &InterleaveExec,
2444        extension_codec: &dyn PhysicalExtensionCodec,
2445    ) -> Result<Self> {
2446        let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
2447        for input in interleave.inputs() {
2448            inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
2449                input.to_owned(),
2450                extension_codec,
2451            )?);
2452        }
2453        Ok(protobuf::PhysicalPlanNode {
2454            physical_plan_type: Some(PhysicalPlanType::Interleave(
2455                protobuf::InterleaveExecNode { inputs },
2456            )),
2457        })
2458    }
2459
2460    fn try_from_sort_preserving_merge_exec(
2461        exec: &SortPreservingMergeExec,
2462        extension_codec: &dyn PhysicalExtensionCodec,
2463    ) -> Result<Self> {
2464        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2465            exec.input().to_owned(),
2466            extension_codec,
2467        )?;
2468        let expr = exec
2469            .expr()
2470            .iter()
2471            .map(|expr| {
2472                let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
2473                    expr: Some(Box::new(serialize_physical_expr(
2474                        &expr.expr,
2475                        extension_codec,
2476                    )?)),
2477                    asc: !expr.options.descending,
2478                    nulls_first: expr.options.nulls_first,
2479                });
2480                Ok(protobuf::PhysicalExprNode {
2481                    expr_type: Some(ExprType::Sort(sort_expr)),
2482                })
2483            })
2484            .collect::<Result<Vec<_>>>()?;
2485        Ok(protobuf::PhysicalPlanNode {
2486            physical_plan_type: Some(PhysicalPlanType::SortPreservingMerge(Box::new(
2487                protobuf::SortPreservingMergeExecNode {
2488                    input: Some(Box::new(input)),
2489                    expr,
2490                    fetch: exec.fetch().map(|f| f as i64).unwrap_or(-1),
2491                },
2492            ))),
2493        })
2494    }
2495
2496    fn try_from_nested_loop_join_exec(
2497        exec: &NestedLoopJoinExec,
2498        extension_codec: &dyn PhysicalExtensionCodec,
2499    ) -> Result<Self> {
2500        let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2501            exec.left().to_owned(),
2502            extension_codec,
2503        )?;
2504        let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2505            exec.right().to_owned(),
2506            extension_codec,
2507        )?;
2508
2509        let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2510        let filter = exec
2511            .filter()
2512            .as_ref()
2513            .map(|f| {
2514                let expression =
2515                    serialize_physical_expr(f.expression(), extension_codec)?;
2516                let column_indices = f
2517                    .column_indices()
2518                    .iter()
2519                    .map(|i| {
2520                        let side: protobuf::JoinSide = i.side.to_owned().into();
2521                        protobuf::ColumnIndex {
2522                            index: i.index as u32,
2523                            side: side.into(),
2524                        }
2525                    })
2526                    .collect();
2527                let schema = f.schema().as_ref().try_into()?;
2528                Ok(protobuf::JoinFilter {
2529                    expression: Some(expression),
2530                    column_indices,
2531                    schema: Some(schema),
2532                })
2533            })
2534            .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2535
2536        Ok(protobuf::PhysicalPlanNode {
2537            physical_plan_type: Some(PhysicalPlanType::NestedLoopJoin(Box::new(
2538                protobuf::NestedLoopJoinExecNode {
2539                    left: Some(Box::new(left)),
2540                    right: Some(Box::new(right)),
2541                    join_type: join_type.into(),
2542                    filter,
2543                    projection: exec.projection().map_or_else(Vec::new, |v| {
2544                        v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2545                    }),
2546                },
2547            ))),
2548        })
2549    }
2550
2551    fn try_from_window_agg_exec(
2552        exec: &WindowAggExec,
2553        extension_codec: &dyn PhysicalExtensionCodec,
2554    ) -> Result<Self> {
2555        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2556            exec.input().to_owned(),
2557            extension_codec,
2558        )?;
2559
2560        let window_expr = exec
2561            .window_expr()
2562            .iter()
2563            .map(|e| serialize_physical_window_expr(e, extension_codec))
2564            .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
2565
2566        let partition_keys = exec
2567            .partition_keys()
2568            .iter()
2569            .map(|e| serialize_physical_expr(e, extension_codec))
2570            .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
2571
2572        Ok(protobuf::PhysicalPlanNode {
2573            physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
2574                protobuf::WindowAggExecNode {
2575                    input: Some(Box::new(input)),
2576                    window_expr,
2577                    partition_keys,
2578                    input_order_mode: None,
2579                },
2580            ))),
2581        })
2582    }
2583
2584    fn try_from_bounded_window_agg_exec(
2585        exec: &BoundedWindowAggExec,
2586        extension_codec: &dyn PhysicalExtensionCodec,
2587    ) -> Result<Self> {
2588        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2589            exec.input().to_owned(),
2590            extension_codec,
2591        )?;
2592
2593        let window_expr = exec
2594            .window_expr()
2595            .iter()
2596            .map(|e| serialize_physical_window_expr(e, extension_codec))
2597            .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
2598
2599        let partition_keys = exec
2600            .partition_keys()
2601            .iter()
2602            .map(|e| serialize_physical_expr(e, extension_codec))
2603            .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
2604
2605        let input_order_mode = match &exec.input_order_mode {
2606            InputOrderMode::Linear => {
2607                window_agg_exec_node::InputOrderMode::Linear(protobuf::EmptyMessage {})
2608            }
2609            InputOrderMode::PartiallySorted(columns) => {
2610                window_agg_exec_node::InputOrderMode::PartiallySorted(
2611                    protobuf::PartiallySortedInputOrderMode {
2612                        columns: columns.iter().map(|c| *c as u64).collect(),
2613                    },
2614                )
2615            }
2616            InputOrderMode::Sorted => {
2617                window_agg_exec_node::InputOrderMode::Sorted(protobuf::EmptyMessage {})
2618            }
2619        };
2620
2621        Ok(protobuf::PhysicalPlanNode {
2622            physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
2623                protobuf::WindowAggExecNode {
2624                    input: Some(Box::new(input)),
2625                    window_expr,
2626                    partition_keys,
2627                    input_order_mode: Some(input_order_mode),
2628                },
2629            ))),
2630        })
2631    }
2632
2633    fn try_from_data_sink_exec(
2634        exec: &DataSinkExec,
2635        extension_codec: &dyn PhysicalExtensionCodec,
2636    ) -> Result<Option<Self>> {
2637        let input: protobuf::PhysicalPlanNode =
2638            protobuf::PhysicalPlanNode::try_from_physical_plan(
2639                exec.input().to_owned(),
2640                extension_codec,
2641            )?;
2642        let sort_order = match exec.sort_order() {
2643            Some(requirements) => {
2644                let expr = requirements
2645                    .iter()
2646                    .map(|requirement| {
2647                        let expr: PhysicalSortExpr = requirement.to_owned().into();
2648                        let sort_expr = protobuf::PhysicalSortExprNode {
2649                            expr: Some(Box::new(serialize_physical_expr(
2650                                &expr.expr,
2651                                extension_codec,
2652                            )?)),
2653                            asc: !expr.options.descending,
2654                            nulls_first: expr.options.nulls_first,
2655                        };
2656                        Ok(sort_expr)
2657                    })
2658                    .collect::<Result<Vec<_>>>()?;
2659                Some(protobuf::PhysicalSortExprNodeCollection {
2660                    physical_sort_expr_nodes: expr,
2661                })
2662            }
2663            None => None,
2664        };
2665
2666        if let Some(sink) = exec.sink().as_any().downcast_ref::<JsonSink>() {
2667            return Ok(Some(protobuf::PhysicalPlanNode {
2668                physical_plan_type: Some(PhysicalPlanType::JsonSink(Box::new(
2669                    protobuf::JsonSinkExecNode {
2670                        input: Some(Box::new(input)),
2671                        sink: Some(sink.try_into()?),
2672                        sink_schema: Some(exec.schema().as_ref().try_into()?),
2673                        sort_order,
2674                    },
2675                ))),
2676            }));
2677        }
2678
2679        if let Some(sink) = exec.sink().as_any().downcast_ref::<CsvSink>() {
2680            return Ok(Some(protobuf::PhysicalPlanNode {
2681                physical_plan_type: Some(PhysicalPlanType::CsvSink(Box::new(
2682                    protobuf::CsvSinkExecNode {
2683                        input: Some(Box::new(input)),
2684                        sink: Some(sink.try_into()?),
2685                        sink_schema: Some(exec.schema().as_ref().try_into()?),
2686                        sort_order,
2687                    },
2688                ))),
2689            }));
2690        }
2691
2692        #[cfg(feature = "parquet")]
2693        if let Some(sink) = exec.sink().as_any().downcast_ref::<ParquetSink>() {
2694            return Ok(Some(protobuf::PhysicalPlanNode {
2695                physical_plan_type: Some(PhysicalPlanType::ParquetSink(Box::new(
2696                    protobuf::ParquetSinkExecNode {
2697                        input: Some(Box::new(input)),
2698                        sink: Some(sink.try_into()?),
2699                        sink_schema: Some(exec.schema().as_ref().try_into()?),
2700                        sort_order,
2701                    },
2702                ))),
2703            }));
2704        }
2705
2706        // If unknown DataSink then let extension handle it
2707        Ok(None)
2708    }
2709
2710    fn try_from_unnest_exec(
2711        exec: &UnnestExec,
2712        extension_codec: &dyn PhysicalExtensionCodec,
2713    ) -> Result<Self> {
2714        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2715            exec.input().to_owned(),
2716            extension_codec,
2717        )?;
2718
2719        Ok(protobuf::PhysicalPlanNode {
2720            physical_plan_type: Some(PhysicalPlanType::Unnest(Box::new(
2721                protobuf::UnnestExecNode {
2722                    input: Some(Box::new(input)),
2723                    schema: Some(exec.schema().try_into()?),
2724                    list_type_columns: exec
2725                        .list_column_indices()
2726                        .iter()
2727                        .map(|c| ProtoListUnnest {
2728                            index_in_input_schema: c.index_in_input_schema as _,
2729                            depth: c.depth as _,
2730                        })
2731                        .collect(),
2732                    struct_type_columns: exec
2733                        .struct_column_indices()
2734                        .iter()
2735                        .map(|c| *c as _)
2736                        .collect(),
2737                    options: Some(exec.options().into()),
2738                },
2739            ))),
2740        })
2741    }
2742}
2743
2744pub trait AsExecutionPlan: Debug + Send + Sync + Clone {
2745    fn try_decode(buf: &[u8]) -> Result<Self>
2746    where
2747        Self: Sized;
2748
2749    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
2750    where
2751        B: BufMut,
2752        Self: Sized;
2753
2754    fn try_into_physical_plan(
2755        &self,
2756        registry: &dyn FunctionRegistry,
2757        runtime: &RuntimeEnv,
2758        extension_codec: &dyn PhysicalExtensionCodec,
2759    ) -> Result<Arc<dyn ExecutionPlan>>;
2760
2761    fn try_from_physical_plan(
2762        plan: Arc<dyn ExecutionPlan>,
2763        extension_codec: &dyn PhysicalExtensionCodec,
2764    ) -> Result<Self>
2765    where
2766        Self: Sized;
2767}
2768
2769pub trait PhysicalExtensionCodec: Debug + Send + Sync {
2770    fn try_decode(
2771        &self,
2772        buf: &[u8],
2773        inputs: &[Arc<dyn ExecutionPlan>],
2774        registry: &dyn FunctionRegistry,
2775    ) -> Result<Arc<dyn ExecutionPlan>>;
2776
2777    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()>;
2778
2779    fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
2780        not_impl_err!("PhysicalExtensionCodec is not provided for scalar function {name}")
2781    }
2782
2783    fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
2784        Ok(())
2785    }
2786
2787    fn try_decode_expr(
2788        &self,
2789        _buf: &[u8],
2790        _inputs: &[Arc<dyn PhysicalExpr>],
2791    ) -> Result<Arc<dyn PhysicalExpr>> {
2792        not_impl_err!("PhysicalExtensionCodec is not provided")
2793    }
2794
2795    fn try_encode_expr(
2796        &self,
2797        _node: &Arc<dyn PhysicalExpr>,
2798        _buf: &mut Vec<u8>,
2799    ) -> Result<()> {
2800        not_impl_err!("PhysicalExtensionCodec is not provided")
2801    }
2802
2803    fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
2804        not_impl_err!(
2805            "PhysicalExtensionCodec is not provided for aggregate function {name}"
2806        )
2807    }
2808
2809    fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
2810        Ok(())
2811    }
2812
2813    fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
2814        not_impl_err!("PhysicalExtensionCodec is not provided for window function {name}")
2815    }
2816
2817    fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
2818        Ok(())
2819    }
2820}
2821
2822#[derive(Debug)]
2823pub struct DefaultPhysicalExtensionCodec {}
2824
2825impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec {
2826    fn try_decode(
2827        &self,
2828        _buf: &[u8],
2829        _inputs: &[Arc<dyn ExecutionPlan>],
2830        _registry: &dyn FunctionRegistry,
2831    ) -> Result<Arc<dyn ExecutionPlan>> {
2832        not_impl_err!("PhysicalExtensionCodec is not provided")
2833    }
2834
2835    fn try_encode(
2836        &self,
2837        _node: Arc<dyn ExecutionPlan>,
2838        _buf: &mut Vec<u8>,
2839    ) -> Result<()> {
2840        not_impl_err!("PhysicalExtensionCodec is not provided")
2841    }
2842}
2843
2844fn into_physical_plan(
2845    node: &Option<Box<protobuf::PhysicalPlanNode>>,
2846    registry: &dyn FunctionRegistry,
2847    runtime: &RuntimeEnv,
2848    extension_codec: &dyn PhysicalExtensionCodec,
2849) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
2850    if let Some(field) = node {
2851        field.try_into_physical_plan(registry, runtime, extension_codec)
2852    } else {
2853        Err(proto_error("Missing required field in protobuf"))
2854    }
2855}