datafusion_proto/physical_plan/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use self::from_proto::parse_protobuf_partitioning;
22use self::to_proto::{serialize_partitioning, serialize_physical_expr};
23use crate::common::{byte_to_string, str_to_byte};
24use crate::physical_plan::from_proto::{
25    parse_physical_expr, parse_physical_sort_expr, parse_physical_sort_exprs,
26    parse_physical_window_expr, parse_protobuf_file_scan_config,
27    parse_protobuf_file_scan_schema,
28};
29use crate::physical_plan::to_proto::{
30    serialize_file_scan_config, serialize_maybe_filter, serialize_physical_aggr_expr,
31    serialize_physical_window_expr,
32};
33use crate::protobuf::physical_aggregate_expr_node::AggregateFunction;
34use crate::protobuf::physical_expr_node::ExprType;
35use crate::protobuf::physical_plan_node::PhysicalPlanType;
36use crate::protobuf::{
37    self, proto_error, window_agg_exec_node, ListUnnest as ProtoListUnnest,
38};
39use crate::{convert_required, into_required};
40
41use datafusion::arrow::compute::SortOptions;
42use datafusion::arrow::datatypes::SchemaRef;
43use datafusion::datasource::file_format::csv::CsvSink;
44use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
45use datafusion::datasource::file_format::json::JsonSink;
46#[cfg(feature = "parquet")]
47use datafusion::datasource::file_format::parquet::ParquetSink;
48#[cfg(feature = "avro")]
49use datafusion::datasource::physical_plan::AvroSource;
50#[cfg(feature = "parquet")]
51use datafusion::datasource::physical_plan::ParquetSource;
52use datafusion::datasource::physical_plan::{
53    CsvSource, FileScanConfig, FileScanConfigBuilder, JsonSource,
54};
55use datafusion::datasource::sink::DataSinkExec;
56use datafusion::datasource::source::DataSourceExec;
57use datafusion::execution::runtime_env::RuntimeEnv;
58use datafusion::execution::FunctionRegistry;
59use datafusion::physical_expr::aggregate::AggregateExprBuilder;
60use datafusion::physical_expr::aggregate::AggregateFunctionExpr;
61use datafusion::physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef};
62use datafusion::physical_plan::aggregates::AggregateMode;
63use datafusion::physical_plan::aggregates::{AggregateExec, PhysicalGroupBy};
64use datafusion::physical_plan::analyze::AnalyzeExec;
65use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
66use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
67use datafusion::physical_plan::empty::EmptyExec;
68use datafusion::physical_plan::explain::ExplainExec;
69use datafusion::physical_plan::expressions::PhysicalSortExpr;
70use datafusion::physical_plan::filter::FilterExec;
71use datafusion::physical_plan::joins::utils::{ColumnIndex, JoinFilter};
72use datafusion::physical_plan::joins::{
73    CrossJoinExec, NestedLoopJoinExec, StreamJoinPartitionMode, SymmetricHashJoinExec,
74};
75use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
76use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
77use datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
78use datafusion::physical_plan::projection::ProjectionExec;
79use datafusion::physical_plan::repartition::RepartitionExec;
80use datafusion::physical_plan::sorts::sort::SortExec;
81use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
82use datafusion::physical_plan::union::{InterleaveExec, UnionExec};
83use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec};
84use datafusion::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
85use datafusion::physical_plan::{
86    ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr,
87};
88use datafusion_common::config::TableParquetOptions;
89use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
90use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
91
92use prost::bytes::BufMut;
93use prost::Message;
94
95pub mod from_proto;
96pub mod to_proto;
97
98impl AsExecutionPlan for protobuf::PhysicalPlanNode {
99    fn try_decode(buf: &[u8]) -> Result<Self>
100    where
101        Self: Sized,
102    {
103        protobuf::PhysicalPlanNode::decode(buf).map_err(|e| {
104            DataFusionError::Internal(format!("failed to decode physical plan: {e:?}"))
105        })
106    }
107
108    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
109    where
110        B: BufMut,
111        Self: Sized,
112    {
113        self.encode(buf).map_err(|e| {
114            DataFusionError::Internal(format!("failed to encode physical plan: {e:?}"))
115        })
116    }
117
118    fn try_into_physical_plan(
119        &self,
120        registry: &dyn FunctionRegistry,
121        runtime: &RuntimeEnv,
122        extension_codec: &dyn PhysicalExtensionCodec,
123    ) -> Result<Arc<dyn ExecutionPlan>> {
124        let plan = self.physical_plan_type.as_ref().ok_or_else(|| {
125            proto_error(format!(
126                "physical_plan::from_proto() Unsupported physical plan '{self:?}'"
127            ))
128        })?;
129        match plan {
130            PhysicalPlanType::Explain(explain) => self.try_into_explain_physical_plan(
131                explain,
132                registry,
133                runtime,
134                extension_codec,
135            ),
136            PhysicalPlanType::Projection(projection) => self
137                .try_into_projection_physical_plan(
138                    projection,
139                    registry,
140                    runtime,
141                    extension_codec,
142                ),
143            PhysicalPlanType::Filter(filter) => self.try_into_filter_physical_plan(
144                filter,
145                registry,
146                runtime,
147                extension_codec,
148            ),
149            PhysicalPlanType::CsvScan(scan) => self.try_into_csv_scan_physical_plan(
150                scan,
151                registry,
152                runtime,
153                extension_codec,
154            ),
155            PhysicalPlanType::JsonScan(scan) => self.try_into_json_scan_physical_plan(
156                scan,
157                registry,
158                runtime,
159                extension_codec,
160            ),
161            #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
162            PhysicalPlanType::ParquetScan(scan) => self
163                .try_into_parquet_scan_physical_plan(
164                    scan,
165                    registry,
166                    runtime,
167                    extension_codec,
168                ),
169            #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
170            PhysicalPlanType::AvroScan(scan) => self.try_into_avro_scan_physical_plan(
171                scan,
172                registry,
173                runtime,
174                extension_codec,
175            ),
176            PhysicalPlanType::CoalesceBatches(coalesce_batches) => self
177                .try_into_coalesce_batches_physical_plan(
178                    coalesce_batches,
179                    registry,
180                    runtime,
181                    extension_codec,
182                ),
183            PhysicalPlanType::Merge(merge) => self.try_into_merge_physical_plan(
184                merge,
185                registry,
186                runtime,
187                extension_codec,
188            ),
189            PhysicalPlanType::Repartition(repart) => self
190                .try_into_repartition_physical_plan(
191                    repart,
192                    registry,
193                    runtime,
194                    extension_codec,
195                ),
196            PhysicalPlanType::GlobalLimit(limit) => self
197                .try_into_global_limit_physical_plan(
198                    limit,
199                    registry,
200                    runtime,
201                    extension_codec,
202                ),
203            PhysicalPlanType::LocalLimit(limit) => self
204                .try_into_local_limit_physical_plan(
205                    limit,
206                    registry,
207                    runtime,
208                    extension_codec,
209                ),
210            PhysicalPlanType::Window(window_agg) => self.try_into_window_physical_plan(
211                window_agg,
212                registry,
213                runtime,
214                extension_codec,
215            ),
216            PhysicalPlanType::Aggregate(hash_agg) => self
217                .try_into_aggregate_physical_plan(
218                    hash_agg,
219                    registry,
220                    runtime,
221                    extension_codec,
222                ),
223            PhysicalPlanType::HashJoin(hashjoin) => self
224                .try_into_hash_join_physical_plan(
225                    hashjoin,
226                    registry,
227                    runtime,
228                    extension_codec,
229                ),
230            PhysicalPlanType::SymmetricHashJoin(sym_join) => self
231                .try_into_symmetric_hash_join_physical_plan(
232                    sym_join,
233                    registry,
234                    runtime,
235                    extension_codec,
236                ),
237            PhysicalPlanType::Union(union) => self.try_into_union_physical_plan(
238                union,
239                registry,
240                runtime,
241                extension_codec,
242            ),
243            PhysicalPlanType::Interleave(interleave) => self
244                .try_into_interleave_physical_plan(
245                    interleave,
246                    registry,
247                    runtime,
248                    extension_codec,
249                ),
250            PhysicalPlanType::CrossJoin(crossjoin) => self
251                .try_into_cross_join_physical_plan(
252                    crossjoin,
253                    registry,
254                    runtime,
255                    extension_codec,
256                ),
257            PhysicalPlanType::Empty(empty) => self.try_into_empty_physical_plan(
258                empty,
259                registry,
260                runtime,
261                extension_codec,
262            ),
263            PhysicalPlanType::PlaceholderRow(placeholder) => self
264                .try_into_placeholder_row_physical_plan(
265                    placeholder,
266                    registry,
267                    runtime,
268                    extension_codec,
269                ),
270            PhysicalPlanType::Sort(sort) => {
271                self.try_into_sort_physical_plan(sort, registry, runtime, extension_codec)
272            }
273            PhysicalPlanType::SortPreservingMerge(sort) => self
274                .try_into_sort_preserving_merge_physical_plan(
275                    sort,
276                    registry,
277                    runtime,
278                    extension_codec,
279                ),
280            PhysicalPlanType::Extension(extension) => self
281                .try_into_extension_physical_plan(
282                    extension,
283                    registry,
284                    runtime,
285                    extension_codec,
286                ),
287            PhysicalPlanType::NestedLoopJoin(join) => self
288                .try_into_nested_loop_join_physical_plan(
289                    join,
290                    registry,
291                    runtime,
292                    extension_codec,
293                ),
294            PhysicalPlanType::Analyze(analyze) => self.try_into_analyze_physical_plan(
295                analyze,
296                registry,
297                runtime,
298                extension_codec,
299            ),
300            PhysicalPlanType::JsonSink(sink) => self.try_into_json_sink_physical_plan(
301                sink,
302                registry,
303                runtime,
304                extension_codec,
305            ),
306            PhysicalPlanType::CsvSink(sink) => self.try_into_csv_sink_physical_plan(
307                sink,
308                registry,
309                runtime,
310                extension_codec,
311            ),
312
313            #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
314            PhysicalPlanType::ParquetSink(sink) => self
315                .try_into_parquet_sink_physical_plan(
316                    sink,
317                    registry,
318                    runtime,
319                    extension_codec,
320                ),
321            PhysicalPlanType::Unnest(unnest) => self.try_into_unnest_physical_plan(
322                unnest,
323                registry,
324                runtime,
325                extension_codec,
326            ),
327        }
328    }
329
330    fn try_from_physical_plan(
331        plan: Arc<dyn ExecutionPlan>,
332        extension_codec: &dyn PhysicalExtensionCodec,
333    ) -> Result<Self>
334    where
335        Self: Sized,
336    {
337        let plan_clone = Arc::clone(&plan);
338        let plan = plan.as_any();
339
340        if let Some(exec) = plan.downcast_ref::<ExplainExec>() {
341            return protobuf::PhysicalPlanNode::try_from_explain_exec(
342                exec,
343                extension_codec,
344            );
345        }
346
347        if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
348            return protobuf::PhysicalPlanNode::try_from_projection_exec(
349                exec,
350                extension_codec,
351            );
352        }
353
354        if let Some(exec) = plan.downcast_ref::<AnalyzeExec>() {
355            return protobuf::PhysicalPlanNode::try_from_analyze_exec(
356                exec,
357                extension_codec,
358            );
359        }
360
361        if let Some(exec) = plan.downcast_ref::<FilterExec>() {
362            return protobuf::PhysicalPlanNode::try_from_filter_exec(
363                exec,
364                extension_codec,
365            );
366        }
367
368        if let Some(limit) = plan.downcast_ref::<GlobalLimitExec>() {
369            return protobuf::PhysicalPlanNode::try_from_global_limit_exec(
370                limit,
371                extension_codec,
372            );
373        }
374
375        if let Some(limit) = plan.downcast_ref::<LocalLimitExec>() {
376            return protobuf::PhysicalPlanNode::try_from_local_limit_exec(
377                limit,
378                extension_codec,
379            );
380        }
381
382        if let Some(exec) = plan.downcast_ref::<HashJoinExec>() {
383            return protobuf::PhysicalPlanNode::try_from_hash_join_exec(
384                exec,
385                extension_codec,
386            );
387        }
388
389        if let Some(exec) = plan.downcast_ref::<SymmetricHashJoinExec>() {
390            return protobuf::PhysicalPlanNode::try_from_symmetric_hash_join_exec(
391                exec,
392                extension_codec,
393            );
394        }
395
396        if let Some(exec) = plan.downcast_ref::<CrossJoinExec>() {
397            return protobuf::PhysicalPlanNode::try_from_cross_join_exec(
398                exec,
399                extension_codec,
400            );
401        }
402
403        if let Some(exec) = plan.downcast_ref::<AggregateExec>() {
404            return protobuf::PhysicalPlanNode::try_from_aggregate_exec(
405                exec,
406                extension_codec,
407            );
408        }
409
410        if let Some(empty) = plan.downcast_ref::<EmptyExec>() {
411            return protobuf::PhysicalPlanNode::try_from_empty_exec(
412                empty,
413                extension_codec,
414            );
415        }
416
417        if let Some(empty) = plan.downcast_ref::<PlaceholderRowExec>() {
418            return protobuf::PhysicalPlanNode::try_from_placeholder_row_exec(
419                empty,
420                extension_codec,
421            );
422        }
423
424        if let Some(coalesce_batches) = plan.downcast_ref::<CoalesceBatchesExec>() {
425            return protobuf::PhysicalPlanNode::try_from_coalesce_batches_exec(
426                coalesce_batches,
427                extension_codec,
428            );
429        }
430
431        if let Some(data_source_exec) = plan.downcast_ref::<DataSourceExec>() {
432            if let Some(node) = protobuf::PhysicalPlanNode::try_from_data_source_exec(
433                data_source_exec,
434                extension_codec,
435            )? {
436                return Ok(node);
437            }
438        }
439
440        if let Some(exec) = plan.downcast_ref::<CoalescePartitionsExec>() {
441            return protobuf::PhysicalPlanNode::try_from_coalesce_partitions_exec(
442                exec,
443                extension_codec,
444            );
445        }
446
447        if let Some(exec) = plan.downcast_ref::<RepartitionExec>() {
448            return protobuf::PhysicalPlanNode::try_from_repartition_exec(
449                exec,
450                extension_codec,
451            );
452        }
453
454        if let Some(exec) = plan.downcast_ref::<SortExec>() {
455            return protobuf::PhysicalPlanNode::try_from_sort_exec(exec, extension_codec);
456        }
457
458        if let Some(union) = plan.downcast_ref::<UnionExec>() {
459            return protobuf::PhysicalPlanNode::try_from_union_exec(
460                union,
461                extension_codec,
462            );
463        }
464
465        if let Some(interleave) = plan.downcast_ref::<InterleaveExec>() {
466            return protobuf::PhysicalPlanNode::try_from_interleave_exec(
467                interleave,
468                extension_codec,
469            );
470        }
471
472        if let Some(exec) = plan.downcast_ref::<SortPreservingMergeExec>() {
473            return protobuf::PhysicalPlanNode::try_from_sort_preserving_merge_exec(
474                exec,
475                extension_codec,
476            );
477        }
478
479        if let Some(exec) = plan.downcast_ref::<NestedLoopJoinExec>() {
480            return protobuf::PhysicalPlanNode::try_from_nested_loop_join_exec(
481                exec,
482                extension_codec,
483            );
484        }
485
486        if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
487            return protobuf::PhysicalPlanNode::try_from_window_agg_exec(
488                exec,
489                extension_codec,
490            );
491        }
492
493        if let Some(exec) = plan.downcast_ref::<BoundedWindowAggExec>() {
494            return protobuf::PhysicalPlanNode::try_from_bounded_window_agg_exec(
495                exec,
496                extension_codec,
497            );
498        }
499
500        if let Some(exec) = plan.downcast_ref::<DataSinkExec>() {
501            if let Some(node) = protobuf::PhysicalPlanNode::try_from_data_sink_exec(
502                exec,
503                extension_codec,
504            )? {
505                return Ok(node);
506            }
507        }
508
509        if let Some(exec) = plan.downcast_ref::<UnnestExec>() {
510            return protobuf::PhysicalPlanNode::try_from_unnest_exec(
511                exec,
512                extension_codec,
513            );
514        }
515
516        let mut buf: Vec<u8> = vec![];
517        match extension_codec.try_encode(Arc::clone(&plan_clone), &mut buf) {
518            Ok(_) => {
519                let inputs: Vec<protobuf::PhysicalPlanNode> = plan_clone
520                    .children()
521                    .into_iter()
522                    .cloned()
523                    .map(|i| {
524                        protobuf::PhysicalPlanNode::try_from_physical_plan(
525                            i,
526                            extension_codec,
527                        )
528                    })
529                    .collect::<Result<_>>()?;
530
531                Ok(protobuf::PhysicalPlanNode {
532                    physical_plan_type: Some(PhysicalPlanType::Extension(
533                        protobuf::PhysicalExtensionNode { node: buf, inputs },
534                    )),
535                })
536            }
537            Err(e) => internal_err!(
538                "Unsupported plan and extension codec failed with [{e}]. Plan: {plan_clone:?}"
539            ),
540        }
541    }
542}
543
544impl protobuf::PhysicalPlanNode {
545    fn try_into_explain_physical_plan(
546        &self,
547        explain: &protobuf::ExplainExecNode,
548        _registry: &dyn FunctionRegistry,
549        _runtime: &RuntimeEnv,
550        _extension_codec: &dyn PhysicalExtensionCodec,
551    ) -> Result<Arc<dyn ExecutionPlan>> {
552        Ok(Arc::new(ExplainExec::new(
553            Arc::new(explain.schema.as_ref().unwrap().try_into()?),
554            explain
555                .stringified_plans
556                .iter()
557                .map(|plan| plan.into())
558                .collect(),
559            explain.verbose,
560        )))
561    }
562
563    fn try_into_projection_physical_plan(
564        &self,
565        projection: &protobuf::ProjectionExecNode,
566        registry: &dyn FunctionRegistry,
567        runtime: &RuntimeEnv,
568        extension_codec: &dyn PhysicalExtensionCodec,
569    ) -> Result<Arc<dyn ExecutionPlan>> {
570        let input: Arc<dyn ExecutionPlan> =
571            into_physical_plan(&projection.input, registry, runtime, extension_codec)?;
572        let exprs = projection
573            .expr
574            .iter()
575            .zip(projection.expr_name.iter())
576            .map(|(expr, name)| {
577                Ok((
578                    parse_physical_expr(
579                        expr,
580                        registry,
581                        input.schema().as_ref(),
582                        extension_codec,
583                    )?,
584                    name.to_string(),
585                ))
586            })
587            .collect::<Result<Vec<(Arc<dyn PhysicalExpr>, String)>>>()?;
588        Ok(Arc::new(ProjectionExec::try_new(exprs, input)?))
589    }
590
591    fn try_into_filter_physical_plan(
592        &self,
593        filter: &protobuf::FilterExecNode,
594        registry: &dyn FunctionRegistry,
595        runtime: &RuntimeEnv,
596        extension_codec: &dyn PhysicalExtensionCodec,
597    ) -> Result<Arc<dyn ExecutionPlan>> {
598        let input: Arc<dyn ExecutionPlan> =
599            into_physical_plan(&filter.input, registry, runtime, extension_codec)?;
600        let predicate = filter
601            .expr
602            .as_ref()
603            .map(|expr| {
604                parse_physical_expr(
605                    expr,
606                    registry,
607                    input.schema().as_ref(),
608                    extension_codec,
609                )
610            })
611            .transpose()?
612            .ok_or_else(|| {
613                DataFusionError::Internal(
614                    "filter (FilterExecNode) in PhysicalPlanNode is missing.".to_owned(),
615                )
616            })?;
617        let filter_selectivity = filter.default_filter_selectivity.try_into();
618        let projection = if !filter.projection.is_empty() {
619            Some(
620                filter
621                    .projection
622                    .iter()
623                    .map(|i| *i as usize)
624                    .collect::<Vec<_>>(),
625            )
626        } else {
627            None
628        };
629        let filter =
630            FilterExec::try_new(predicate, input)?.with_projection(projection)?;
631        match filter_selectivity {
632            Ok(filter_selectivity) => Ok(Arc::new(
633                filter.with_default_selectivity(filter_selectivity)?,
634            )),
635            Err(_) => Err(DataFusionError::Internal(
636                "filter_selectivity in PhysicalPlanNode is invalid ".to_owned(),
637            )),
638        }
639    }
640
641    fn try_into_csv_scan_physical_plan(
642        &self,
643        scan: &protobuf::CsvScanExecNode,
644        registry: &dyn FunctionRegistry,
645        _runtime: &RuntimeEnv,
646        extension_codec: &dyn PhysicalExtensionCodec,
647    ) -> Result<Arc<dyn ExecutionPlan>> {
648        let escape =
649            if let Some(protobuf::csv_scan_exec_node::OptionalEscape::Escape(escape)) =
650                &scan.optional_escape
651            {
652                Some(str_to_byte(escape, "escape")?)
653            } else {
654                None
655            };
656
657        let comment = if let Some(
658            protobuf::csv_scan_exec_node::OptionalComment::Comment(comment),
659        ) = &scan.optional_comment
660        {
661            Some(str_to_byte(comment, "comment")?)
662        } else {
663            None
664        };
665
666        let source = Arc::new(
667            CsvSource::new(
668                scan.has_header,
669                str_to_byte(&scan.delimiter, "delimiter")?,
670                0,
671            )
672            .with_escape(escape)
673            .with_comment(comment),
674        );
675
676        let conf = FileScanConfigBuilder::from(parse_protobuf_file_scan_config(
677            scan.base_conf.as_ref().unwrap(),
678            registry,
679            extension_codec,
680            source,
681        )?)
682        .with_newlines_in_values(scan.newlines_in_values)
683        .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
684        .build();
685        Ok(DataSourceExec::from_data_source(conf))
686    }
687
688    fn try_into_json_scan_physical_plan(
689        &self,
690        scan: &protobuf::JsonScanExecNode,
691        registry: &dyn FunctionRegistry,
692        _runtime: &RuntimeEnv,
693        extension_codec: &dyn PhysicalExtensionCodec,
694    ) -> Result<Arc<dyn ExecutionPlan>> {
695        let scan_conf = parse_protobuf_file_scan_config(
696            scan.base_conf.as_ref().unwrap(),
697            registry,
698            extension_codec,
699            Arc::new(JsonSource::new()),
700        )?;
701        Ok(DataSourceExec::from_data_source(scan_conf))
702    }
703
704    #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
705    fn try_into_parquet_scan_physical_plan(
706        &self,
707        scan: &protobuf::ParquetScanExecNode,
708        registry: &dyn FunctionRegistry,
709        _runtime: &RuntimeEnv,
710        extension_codec: &dyn PhysicalExtensionCodec,
711    ) -> Result<Arc<dyn ExecutionPlan>> {
712        #[cfg(feature = "parquet")]
713        {
714            let schema =
715                parse_protobuf_file_scan_schema(scan.base_conf.as_ref().unwrap())?;
716            let predicate = scan
717                .predicate
718                .as_ref()
719                .map(|expr| {
720                    parse_physical_expr(expr, registry, schema.as_ref(), extension_codec)
721                })
722                .transpose()?;
723            let mut options = TableParquetOptions::default();
724
725            if let Some(table_options) = scan.parquet_options.as_ref() {
726                options = table_options.try_into()?;
727            }
728            let mut source = ParquetSource::new(options);
729
730            if let Some(predicate) = predicate {
731                source = source.with_predicate(predicate);
732            }
733            let base_config = parse_protobuf_file_scan_config(
734                scan.base_conf.as_ref().unwrap(),
735                registry,
736                extension_codec,
737                Arc::new(source),
738            )?;
739            Ok(DataSourceExec::from_data_source(base_config))
740        }
741        #[cfg(not(feature = "parquet"))]
742        panic!("Unable to process a Parquet PhysicalPlan when `parquet` feature is not enabled")
743    }
744
745    #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
746    fn try_into_avro_scan_physical_plan(
747        &self,
748        scan: &protobuf::AvroScanExecNode,
749        registry: &dyn FunctionRegistry,
750        _runtime: &RuntimeEnv,
751        extension_codec: &dyn PhysicalExtensionCodec,
752    ) -> Result<Arc<dyn ExecutionPlan>> {
753        #[cfg(feature = "avro")]
754        {
755            let conf = parse_protobuf_file_scan_config(
756                scan.base_conf.as_ref().unwrap(),
757                registry,
758                extension_codec,
759                Arc::new(AvroSource::new()),
760            )?;
761            Ok(DataSourceExec::from_data_source(conf))
762        }
763        #[cfg(not(feature = "avro"))]
764        panic!("Unable to process a Avro PhysicalPlan when `avro` feature is not enabled")
765    }
766
767    fn try_into_coalesce_batches_physical_plan(
768        &self,
769        coalesce_batches: &protobuf::CoalesceBatchesExecNode,
770        registry: &dyn FunctionRegistry,
771        runtime: &RuntimeEnv,
772        extension_codec: &dyn PhysicalExtensionCodec,
773    ) -> Result<Arc<dyn ExecutionPlan>> {
774        let input: Arc<dyn ExecutionPlan> = into_physical_plan(
775            &coalesce_batches.input,
776            registry,
777            runtime,
778            extension_codec,
779        )?;
780        Ok(Arc::new(
781            CoalesceBatchesExec::new(input, coalesce_batches.target_batch_size as usize)
782                .with_fetch(coalesce_batches.fetch.map(|f| f as usize)),
783        ))
784    }
785
786    fn try_into_merge_physical_plan(
787        &self,
788        merge: &protobuf::CoalescePartitionsExecNode,
789        registry: &dyn FunctionRegistry,
790        runtime: &RuntimeEnv,
791        extension_codec: &dyn PhysicalExtensionCodec,
792    ) -> Result<Arc<dyn ExecutionPlan>> {
793        let input: Arc<dyn ExecutionPlan> =
794            into_physical_plan(&merge.input, registry, runtime, extension_codec)?;
795        Ok(Arc::new(
796            CoalescePartitionsExec::new(input)
797                .with_fetch(merge.fetch.map(|f| f as usize)),
798        ))
799    }
800
801    fn try_into_repartition_physical_plan(
802        &self,
803        repart: &protobuf::RepartitionExecNode,
804        registry: &dyn FunctionRegistry,
805        runtime: &RuntimeEnv,
806        extension_codec: &dyn PhysicalExtensionCodec,
807    ) -> Result<Arc<dyn ExecutionPlan>> {
808        let input: Arc<dyn ExecutionPlan> =
809            into_physical_plan(&repart.input, registry, runtime, extension_codec)?;
810        let partitioning = parse_protobuf_partitioning(
811            repart.partitioning.as_ref(),
812            registry,
813            input.schema().as_ref(),
814            extension_codec,
815        )?;
816        Ok(Arc::new(RepartitionExec::try_new(
817            input,
818            partitioning.unwrap(),
819        )?))
820    }
821
822    fn try_into_global_limit_physical_plan(
823        &self,
824        limit: &protobuf::GlobalLimitExecNode,
825        registry: &dyn FunctionRegistry,
826        runtime: &RuntimeEnv,
827        extension_codec: &dyn PhysicalExtensionCodec,
828    ) -> Result<Arc<dyn ExecutionPlan>> {
829        let input: Arc<dyn ExecutionPlan> =
830            into_physical_plan(&limit.input, registry, runtime, extension_codec)?;
831        let fetch = if limit.fetch >= 0 {
832            Some(limit.fetch as usize)
833        } else {
834            None
835        };
836        Ok(Arc::new(GlobalLimitExec::new(
837            input,
838            limit.skip as usize,
839            fetch,
840        )))
841    }
842
843    fn try_into_local_limit_physical_plan(
844        &self,
845        limit: &protobuf::LocalLimitExecNode,
846        registry: &dyn FunctionRegistry,
847        runtime: &RuntimeEnv,
848        extension_codec: &dyn PhysicalExtensionCodec,
849    ) -> Result<Arc<dyn ExecutionPlan>> {
850        let input: Arc<dyn ExecutionPlan> =
851            into_physical_plan(&limit.input, registry, runtime, extension_codec)?;
852        Ok(Arc::new(LocalLimitExec::new(input, limit.fetch as usize)))
853    }
854
855    fn try_into_window_physical_plan(
856        &self,
857        window_agg: &protobuf::WindowAggExecNode,
858        registry: &dyn FunctionRegistry,
859        runtime: &RuntimeEnv,
860        extension_codec: &dyn PhysicalExtensionCodec,
861    ) -> Result<Arc<dyn ExecutionPlan>> {
862        let input: Arc<dyn ExecutionPlan> =
863            into_physical_plan(&window_agg.input, registry, runtime, extension_codec)?;
864        let input_schema = input.schema();
865
866        let physical_window_expr: Vec<Arc<dyn WindowExpr>> = window_agg
867            .window_expr
868            .iter()
869            .map(|window_expr| {
870                parse_physical_window_expr(
871                    window_expr,
872                    registry,
873                    input_schema.as_ref(),
874                    extension_codec,
875                )
876            })
877            .collect::<Result<Vec<_>, _>>()?;
878
879        let partition_keys = window_agg
880            .partition_keys
881            .iter()
882            .map(|expr| {
883                parse_physical_expr(
884                    expr,
885                    registry,
886                    input.schema().as_ref(),
887                    extension_codec,
888                )
889            })
890            .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
891
892        if let Some(input_order_mode) = window_agg.input_order_mode.as_ref() {
893            let input_order_mode = match input_order_mode {
894                window_agg_exec_node::InputOrderMode::Linear(_) => InputOrderMode::Linear,
895                window_agg_exec_node::InputOrderMode::PartiallySorted(
896                    protobuf::PartiallySortedInputOrderMode { columns },
897                ) => InputOrderMode::PartiallySorted(
898                    columns.iter().map(|c| *c as usize).collect(),
899                ),
900                window_agg_exec_node::InputOrderMode::Sorted(_) => InputOrderMode::Sorted,
901            };
902
903            Ok(Arc::new(BoundedWindowAggExec::try_new(
904                physical_window_expr,
905                input,
906                input_order_mode,
907                !partition_keys.is_empty(),
908            )?))
909        } else {
910            Ok(Arc::new(WindowAggExec::try_new(
911                physical_window_expr,
912                input,
913                !partition_keys.is_empty(),
914            )?))
915        }
916    }
917
918    fn try_into_aggregate_physical_plan(
919        &self,
920        hash_agg: &protobuf::AggregateExecNode,
921        registry: &dyn FunctionRegistry,
922        runtime: &RuntimeEnv,
923        extension_codec: &dyn PhysicalExtensionCodec,
924    ) -> Result<Arc<dyn ExecutionPlan>> {
925        let input: Arc<dyn ExecutionPlan> =
926            into_physical_plan(&hash_agg.input, registry, runtime, extension_codec)?;
927        let mode = protobuf::AggregateMode::try_from(hash_agg.mode).map_err(|_| {
928            proto_error(format!(
929                "Received a AggregateNode message with unknown AggregateMode {}",
930                hash_agg.mode
931            ))
932        })?;
933        let agg_mode: AggregateMode = match mode {
934            protobuf::AggregateMode::Partial => AggregateMode::Partial,
935            protobuf::AggregateMode::Final => AggregateMode::Final,
936            protobuf::AggregateMode::FinalPartitioned => AggregateMode::FinalPartitioned,
937            protobuf::AggregateMode::Single => AggregateMode::Single,
938            protobuf::AggregateMode::SinglePartitioned => {
939                AggregateMode::SinglePartitioned
940            }
941        };
942
943        let num_expr = hash_agg.group_expr.len();
944
945        let group_expr = hash_agg
946            .group_expr
947            .iter()
948            .zip(hash_agg.group_expr_name.iter())
949            .map(|(expr, name)| {
950                parse_physical_expr(
951                    expr,
952                    registry,
953                    input.schema().as_ref(),
954                    extension_codec,
955                )
956                .map(|expr| (expr, name.to_string()))
957            })
958            .collect::<Result<Vec<_>, _>>()?;
959
960        let null_expr = hash_agg
961            .null_expr
962            .iter()
963            .zip(hash_agg.group_expr_name.iter())
964            .map(|(expr, name)| {
965                parse_physical_expr(
966                    expr,
967                    registry,
968                    input.schema().as_ref(),
969                    extension_codec,
970                )
971                .map(|expr| (expr, name.to_string()))
972            })
973            .collect::<Result<Vec<_>, _>>()?;
974
975        let groups: Vec<Vec<bool>> = if !hash_agg.groups.is_empty() {
976            hash_agg
977                .groups
978                .chunks(num_expr)
979                .map(|g| g.to_vec())
980                .collect::<Vec<Vec<bool>>>()
981        } else {
982            vec![]
983        };
984
985        let input_schema = hash_agg.input_schema.as_ref().ok_or_else(|| {
986            DataFusionError::Internal(
987                "input_schema in AggregateNode is missing.".to_owned(),
988            )
989        })?;
990        let physical_schema: SchemaRef = SchemaRef::new(input_schema.try_into()?);
991
992        let physical_filter_expr = hash_agg
993            .filter_expr
994            .iter()
995            .map(|expr| {
996                expr.expr
997                    .as_ref()
998                    .map(|e| {
999                        parse_physical_expr(
1000                            e,
1001                            registry,
1002                            &physical_schema,
1003                            extension_codec,
1004                        )
1005                    })
1006                    .transpose()
1007            })
1008            .collect::<Result<Vec<_>, _>>()?;
1009
1010        let physical_aggr_expr: Vec<Arc<AggregateFunctionExpr>> = hash_agg
1011            .aggr_expr
1012            .iter()
1013            .zip(hash_agg.aggr_expr_name.iter())
1014            .map(|(expr, name)| {
1015                let expr_type = expr.expr_type.as_ref().ok_or_else(|| {
1016                    proto_error("Unexpected empty aggregate physical expression")
1017                })?;
1018
1019                match expr_type {
1020                    ExprType::AggregateExpr(agg_node) => {
1021                        let input_phy_expr: Vec<Arc<dyn PhysicalExpr>> = agg_node
1022                            .expr
1023                            .iter()
1024                            .map(|e| {
1025                                parse_physical_expr(
1026                                    e,
1027                                    registry,
1028                                    &physical_schema,
1029                                    extension_codec,
1030                                )
1031                            })
1032                            .collect::<Result<Vec<_>>>()?;
1033                        let ordering_req: LexOrdering = agg_node
1034                            .ordering_req
1035                            .iter()
1036                            .map(|e| {
1037                                parse_physical_sort_expr(
1038                                    e,
1039                                    registry,
1040                                    &physical_schema,
1041                                    extension_codec,
1042                                )
1043                            })
1044                            .collect::<Result<LexOrdering>>()?;
1045                        agg_node
1046                            .aggregate_function
1047                            .as_ref()
1048                            .map(|func| match func {
1049                                AggregateFunction::UserDefinedAggrFunction(udaf_name) => {
1050                                    let agg_udf = match &agg_node.fun_definition {
1051                                        Some(buf) => extension_codec
1052                                            .try_decode_udaf(udaf_name, buf)?,
1053                                        None => {
1054                                            registry.udaf(udaf_name).or_else(|_| {
1055                                                extension_codec
1056                                                    .try_decode_udaf(udaf_name, &[])
1057                                            })?
1058                                        }
1059                                    };
1060
1061                                    AggregateExprBuilder::new(agg_udf, input_phy_expr)
1062                                        .schema(Arc::clone(&physical_schema))
1063                                        .alias(name)
1064                                        .with_ignore_nulls(agg_node.ignore_nulls)
1065                                        .with_distinct(agg_node.distinct)
1066                                        .order_by(ordering_req)
1067                                        .build()
1068                                        .map(Arc::new)
1069                                }
1070                            })
1071                            .transpose()?
1072                            .ok_or_else(|| {
1073                                proto_error(
1074                                    "Invalid AggregateExpr, missing aggregate_function",
1075                                )
1076                            })
1077                    }
1078                    _ => internal_err!("Invalid aggregate expression for AggregateExec"),
1079                }
1080            })
1081            .collect::<Result<Vec<_>, _>>()?;
1082
1083        let limit = hash_agg
1084            .limit
1085            .as_ref()
1086            .map(|lit_value| lit_value.limit as usize);
1087
1088        let agg = AggregateExec::try_new(
1089            agg_mode,
1090            PhysicalGroupBy::new(group_expr, null_expr, groups),
1091            physical_aggr_expr,
1092            physical_filter_expr,
1093            input,
1094            physical_schema,
1095        )?;
1096
1097        let agg = agg.with_limit(limit);
1098
1099        Ok(Arc::new(agg))
1100    }
1101
1102    fn try_into_hash_join_physical_plan(
1103        &self,
1104        hashjoin: &protobuf::HashJoinExecNode,
1105        registry: &dyn FunctionRegistry,
1106        runtime: &RuntimeEnv,
1107        extension_codec: &dyn PhysicalExtensionCodec,
1108    ) -> Result<Arc<dyn ExecutionPlan>> {
1109        let left: Arc<dyn ExecutionPlan> =
1110            into_physical_plan(&hashjoin.left, registry, runtime, extension_codec)?;
1111        let right: Arc<dyn ExecutionPlan> =
1112            into_physical_plan(&hashjoin.right, registry, runtime, extension_codec)?;
1113        let left_schema = left.schema();
1114        let right_schema = right.schema();
1115        let on: Vec<(PhysicalExprRef, PhysicalExprRef)> = hashjoin
1116            .on
1117            .iter()
1118            .map(|col| {
1119                let left = parse_physical_expr(
1120                    &col.left.clone().unwrap(),
1121                    registry,
1122                    left_schema.as_ref(),
1123                    extension_codec,
1124                )?;
1125                let right = parse_physical_expr(
1126                    &col.right.clone().unwrap(),
1127                    registry,
1128                    right_schema.as_ref(),
1129                    extension_codec,
1130                )?;
1131                Ok((left, right))
1132            })
1133            .collect::<Result<_>>()?;
1134        let join_type =
1135            protobuf::JoinType::try_from(hashjoin.join_type).map_err(|_| {
1136                proto_error(format!(
1137                    "Received a HashJoinNode message with unknown JoinType {}",
1138                    hashjoin.join_type
1139                ))
1140            })?;
1141        let filter = hashjoin
1142            .filter
1143            .as_ref()
1144            .map(|f| {
1145                let schema = f
1146                    .schema
1147                    .as_ref()
1148                    .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1149                    .try_into()?;
1150
1151                let expression = parse_physical_expr(
1152                    f.expression.as_ref().ok_or_else(|| {
1153                        proto_error("Unexpected empty filter expression")
1154                    })?,
1155                    registry, &schema,
1156                    extension_codec,
1157                )?;
1158                let column_indices = f.column_indices
1159                    .iter()
1160                    .map(|i| {
1161                        let side = protobuf::JoinSide::try_from(i.side)
1162                            .map_err(|_| proto_error(format!(
1163                                "Received a HashJoinNode message with JoinSide in Filter {}",
1164                                i.side))
1165                            )?;
1166
1167                        Ok(ColumnIndex {
1168                            index: i.index as usize,
1169                            side: side.into(),
1170                        })
1171                    })
1172                    .collect::<Result<Vec<_>>>()?;
1173
1174                Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1175            })
1176            .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1177
1178        let partition_mode = protobuf::PartitionMode::try_from(hashjoin.partition_mode)
1179            .map_err(|_| {
1180            proto_error(format!(
1181                "Received a HashJoinNode message with unknown PartitionMode {}",
1182                hashjoin.partition_mode
1183            ))
1184        })?;
1185        let partition_mode = match partition_mode {
1186            protobuf::PartitionMode::CollectLeft => PartitionMode::CollectLeft,
1187            protobuf::PartitionMode::Partitioned => PartitionMode::Partitioned,
1188            protobuf::PartitionMode::Auto => PartitionMode::Auto,
1189        };
1190        let projection = if !hashjoin.projection.is_empty() {
1191            Some(
1192                hashjoin
1193                    .projection
1194                    .iter()
1195                    .map(|i| *i as usize)
1196                    .collect::<Vec<_>>(),
1197            )
1198        } else {
1199            None
1200        };
1201        Ok(Arc::new(HashJoinExec::try_new(
1202            left,
1203            right,
1204            on,
1205            filter,
1206            &join_type.into(),
1207            projection,
1208            partition_mode,
1209            hashjoin.null_equals_null,
1210        )?))
1211    }
1212
1213    fn try_into_symmetric_hash_join_physical_plan(
1214        &self,
1215        sym_join: &protobuf::SymmetricHashJoinExecNode,
1216        registry: &dyn FunctionRegistry,
1217        runtime: &RuntimeEnv,
1218        extension_codec: &dyn PhysicalExtensionCodec,
1219    ) -> Result<Arc<dyn ExecutionPlan>> {
1220        let left =
1221            into_physical_plan(&sym_join.left, registry, runtime, extension_codec)?;
1222        let right =
1223            into_physical_plan(&sym_join.right, registry, runtime, extension_codec)?;
1224        let left_schema = left.schema();
1225        let right_schema = right.schema();
1226        let on = sym_join
1227            .on
1228            .iter()
1229            .map(|col| {
1230                let left = parse_physical_expr(
1231                    &col.left.clone().unwrap(),
1232                    registry,
1233                    left_schema.as_ref(),
1234                    extension_codec,
1235                )?;
1236                let right = parse_physical_expr(
1237                    &col.right.clone().unwrap(),
1238                    registry,
1239                    right_schema.as_ref(),
1240                    extension_codec,
1241                )?;
1242                Ok((left, right))
1243            })
1244            .collect::<Result<_>>()?;
1245        let join_type =
1246            protobuf::JoinType::try_from(sym_join.join_type).map_err(|_| {
1247                proto_error(format!(
1248                    "Received a SymmetricHashJoin message with unknown JoinType {}",
1249                    sym_join.join_type
1250                ))
1251            })?;
1252        let filter = sym_join
1253            .filter
1254            .as_ref()
1255            .map(|f| {
1256                let schema = f
1257                    .schema
1258                    .as_ref()
1259                    .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1260                    .try_into()?;
1261
1262                let expression = parse_physical_expr(
1263                    f.expression.as_ref().ok_or_else(|| {
1264                        proto_error("Unexpected empty filter expression")
1265                    })?,
1266                    registry, &schema,
1267                    extension_codec,
1268                )?;
1269                let column_indices = f.column_indices
1270                    .iter()
1271                    .map(|i| {
1272                        let side = protobuf::JoinSide::try_from(i.side)
1273                            .map_err(|_| proto_error(format!(
1274                                "Received a HashJoinNode message with JoinSide in Filter {}",
1275                                i.side))
1276                            )?;
1277
1278                        Ok(ColumnIndex {
1279                            index: i.index as usize,
1280                            side: side.into(),
1281                        })
1282                    })
1283                    .collect::<Result<_>>()?;
1284
1285                Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1286            })
1287            .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1288
1289        let left_sort_exprs = parse_physical_sort_exprs(
1290            &sym_join.left_sort_exprs,
1291            registry,
1292            &left_schema,
1293            extension_codec,
1294        )?;
1295        let left_sort_exprs = if left_sort_exprs.is_empty() {
1296            None
1297        } else {
1298            Some(left_sort_exprs)
1299        };
1300
1301        let right_sort_exprs = parse_physical_sort_exprs(
1302            &sym_join.right_sort_exprs,
1303            registry,
1304            &right_schema,
1305            extension_codec,
1306        )?;
1307        let right_sort_exprs = if right_sort_exprs.is_empty() {
1308            None
1309        } else {
1310            Some(right_sort_exprs)
1311        };
1312
1313        let partition_mode = protobuf::StreamPartitionMode::try_from(
1314            sym_join.partition_mode,
1315        )
1316        .map_err(|_| {
1317            proto_error(format!(
1318                "Received a SymmetricHashJoin message with unknown PartitionMode {}",
1319                sym_join.partition_mode
1320            ))
1321        })?;
1322        let partition_mode = match partition_mode {
1323            protobuf::StreamPartitionMode::SinglePartition => {
1324                StreamJoinPartitionMode::SinglePartition
1325            }
1326            protobuf::StreamPartitionMode::PartitionedExec => {
1327                StreamJoinPartitionMode::Partitioned
1328            }
1329        };
1330        SymmetricHashJoinExec::try_new(
1331            left,
1332            right,
1333            on,
1334            filter,
1335            &join_type.into(),
1336            sym_join.null_equals_null,
1337            left_sort_exprs,
1338            right_sort_exprs,
1339            partition_mode,
1340        )
1341        .map(|e| Arc::new(e) as _)
1342    }
1343
1344    fn try_into_union_physical_plan(
1345        &self,
1346        union: &protobuf::UnionExecNode,
1347        registry: &dyn FunctionRegistry,
1348        runtime: &RuntimeEnv,
1349        extension_codec: &dyn PhysicalExtensionCodec,
1350    ) -> Result<Arc<dyn ExecutionPlan>> {
1351        let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1352        for input in &union.inputs {
1353            inputs.push(input.try_into_physical_plan(
1354                registry,
1355                runtime,
1356                extension_codec,
1357            )?);
1358        }
1359        Ok(Arc::new(UnionExec::new(inputs)))
1360    }
1361
1362    fn try_into_interleave_physical_plan(
1363        &self,
1364        interleave: &protobuf::InterleaveExecNode,
1365        registry: &dyn FunctionRegistry,
1366        runtime: &RuntimeEnv,
1367        extension_codec: &dyn PhysicalExtensionCodec,
1368    ) -> Result<Arc<dyn ExecutionPlan>> {
1369        let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1370        for input in &interleave.inputs {
1371            inputs.push(input.try_into_physical_plan(
1372                registry,
1373                runtime,
1374                extension_codec,
1375            )?);
1376        }
1377        Ok(Arc::new(InterleaveExec::try_new(inputs)?))
1378    }
1379
1380    fn try_into_cross_join_physical_plan(
1381        &self,
1382        crossjoin: &protobuf::CrossJoinExecNode,
1383        registry: &dyn FunctionRegistry,
1384        runtime: &RuntimeEnv,
1385        extension_codec: &dyn PhysicalExtensionCodec,
1386    ) -> Result<Arc<dyn ExecutionPlan>> {
1387        let left: Arc<dyn ExecutionPlan> =
1388            into_physical_plan(&crossjoin.left, registry, runtime, extension_codec)?;
1389        let right: Arc<dyn ExecutionPlan> =
1390            into_physical_plan(&crossjoin.right, registry, runtime, extension_codec)?;
1391        Ok(Arc::new(CrossJoinExec::new(left, right)))
1392    }
1393
1394    fn try_into_empty_physical_plan(
1395        &self,
1396        empty: &protobuf::EmptyExecNode,
1397        _registry: &dyn FunctionRegistry,
1398        _runtime: &RuntimeEnv,
1399        _extension_codec: &dyn PhysicalExtensionCodec,
1400    ) -> Result<Arc<dyn ExecutionPlan>> {
1401        let schema = Arc::new(convert_required!(empty.schema)?);
1402        Ok(Arc::new(EmptyExec::new(schema)))
1403    }
1404
1405    fn try_into_placeholder_row_physical_plan(
1406        &self,
1407        placeholder: &protobuf::PlaceholderRowExecNode,
1408        _registry: &dyn FunctionRegistry,
1409        _runtime: &RuntimeEnv,
1410        _extension_codec: &dyn PhysicalExtensionCodec,
1411    ) -> Result<Arc<dyn ExecutionPlan>> {
1412        let schema = Arc::new(convert_required!(placeholder.schema)?);
1413        Ok(Arc::new(PlaceholderRowExec::new(schema)))
1414    }
1415
1416    fn try_into_sort_physical_plan(
1417        &self,
1418        sort: &protobuf::SortExecNode,
1419        registry: &dyn FunctionRegistry,
1420        runtime: &RuntimeEnv,
1421        extension_codec: &dyn PhysicalExtensionCodec,
1422    ) -> Result<Arc<dyn ExecutionPlan>> {
1423        let input: Arc<dyn ExecutionPlan> =
1424            into_physical_plan(&sort.input, registry, runtime, extension_codec)?;
1425        let exprs = sort
1426                    .expr
1427                    .iter()
1428                    .map(|expr| {
1429                        let expr = expr.expr_type.as_ref().ok_or_else(|| {
1430                            proto_error(format!(
1431                                "physical_plan::from_proto() Unexpected expr {self:?}"
1432                            ))
1433                        })?;
1434                        if let ExprType::Sort(sort_expr) = expr {
1435                            let expr = sort_expr
1436                                .expr
1437                                .as_ref()
1438                                .ok_or_else(|| {
1439                                    proto_error(format!(
1440                                        "physical_plan::from_proto() Unexpected sort expr {self:?}"
1441                                    ))
1442                                })?
1443                                .as_ref();
1444                            Ok(PhysicalSortExpr {
1445                                expr: parse_physical_expr(expr, registry, input.schema().as_ref(), extension_codec)?,
1446                                options: SortOptions {
1447                                    descending: !sort_expr.asc,
1448                                    nulls_first: sort_expr.nulls_first,
1449                                },
1450                            })
1451                        } else {
1452                            internal_err!(
1453                                "physical_plan::from_proto() {self:?}"
1454                            )
1455                        }
1456                    })
1457                    .collect::<Result<LexOrdering, _>>()?;
1458        let fetch = if sort.fetch < 0 {
1459            None
1460        } else {
1461            Some(sort.fetch as usize)
1462        };
1463        let new_sort = SortExec::new(exprs, input)
1464            .with_fetch(fetch)
1465            .with_preserve_partitioning(sort.preserve_partitioning);
1466
1467        Ok(Arc::new(new_sort))
1468    }
1469
1470    fn try_into_sort_preserving_merge_physical_plan(
1471        &self,
1472        sort: &protobuf::SortPreservingMergeExecNode,
1473        registry: &dyn FunctionRegistry,
1474        runtime: &RuntimeEnv,
1475        extension_codec: &dyn PhysicalExtensionCodec,
1476    ) -> Result<Arc<dyn ExecutionPlan>> {
1477        let input: Arc<dyn ExecutionPlan> =
1478            into_physical_plan(&sort.input, registry, runtime, extension_codec)?;
1479        let exprs = sort
1480            .expr
1481            .iter()
1482            .map(|expr| {
1483                let expr = expr.expr_type.as_ref().ok_or_else(|| {
1484                    proto_error(format!(
1485                        "physical_plan::from_proto() Unexpected expr {self:?}"
1486                    ))
1487                })?;
1488                if let ExprType::Sort(sort_expr) = expr {
1489                    let expr = sort_expr
1490                        .expr
1491                        .as_ref()
1492                        .ok_or_else(|| {
1493                            proto_error(format!(
1494                            "physical_plan::from_proto() Unexpected sort expr {self:?}"
1495                        ))
1496                        })?
1497                        .as_ref();
1498                    Ok(PhysicalSortExpr {
1499                        expr: parse_physical_expr(
1500                            expr,
1501                            registry,
1502                            input.schema().as_ref(),
1503                            extension_codec,
1504                        )?,
1505                        options: SortOptions {
1506                            descending: !sort_expr.asc,
1507                            nulls_first: sort_expr.nulls_first,
1508                        },
1509                    })
1510                } else {
1511                    internal_err!("physical_plan::from_proto() {self:?}")
1512                }
1513            })
1514            .collect::<Result<LexOrdering, _>>()?;
1515        let fetch = if sort.fetch < 0 {
1516            None
1517        } else {
1518            Some(sort.fetch as usize)
1519        };
1520        Ok(Arc::new(
1521            SortPreservingMergeExec::new(exprs, input).with_fetch(fetch),
1522        ))
1523    }
1524
1525    fn try_into_extension_physical_plan(
1526        &self,
1527        extension: &protobuf::PhysicalExtensionNode,
1528        registry: &dyn FunctionRegistry,
1529        runtime: &RuntimeEnv,
1530        extension_codec: &dyn PhysicalExtensionCodec,
1531    ) -> Result<Arc<dyn ExecutionPlan>> {
1532        let inputs: Vec<Arc<dyn ExecutionPlan>> = extension
1533            .inputs
1534            .iter()
1535            .map(|i| i.try_into_physical_plan(registry, runtime, extension_codec))
1536            .collect::<Result<_>>()?;
1537
1538        let extension_node =
1539            extension_codec.try_decode(extension.node.as_slice(), &inputs, registry)?;
1540
1541        Ok(extension_node)
1542    }
1543
1544    fn try_into_nested_loop_join_physical_plan(
1545        &self,
1546        join: &protobuf::NestedLoopJoinExecNode,
1547        registry: &dyn FunctionRegistry,
1548        runtime: &RuntimeEnv,
1549        extension_codec: &dyn PhysicalExtensionCodec,
1550    ) -> Result<Arc<dyn ExecutionPlan>> {
1551        let left: Arc<dyn ExecutionPlan> =
1552            into_physical_plan(&join.left, registry, runtime, extension_codec)?;
1553        let right: Arc<dyn ExecutionPlan> =
1554            into_physical_plan(&join.right, registry, runtime, extension_codec)?;
1555        let join_type = protobuf::JoinType::try_from(join.join_type).map_err(|_| {
1556            proto_error(format!(
1557                "Received a NestedLoopJoinExecNode message with unknown JoinType {}",
1558                join.join_type
1559            ))
1560        })?;
1561        let filter = join
1562                    .filter
1563                    .as_ref()
1564                    .map(|f| {
1565                        let schema = f
1566                            .schema
1567                            .as_ref()
1568                            .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1569                            .try_into()?;
1570
1571                        let expression = parse_physical_expr(
1572                            f.expression.as_ref().ok_or_else(|| {
1573                                proto_error("Unexpected empty filter expression")
1574                            })?,
1575                            registry, &schema,
1576                            extension_codec,
1577                        )?;
1578                        let column_indices = f.column_indices
1579                            .iter()
1580                            .map(|i| {
1581                                let side = protobuf::JoinSide::try_from(i.side)
1582                                    .map_err(|_| proto_error(format!(
1583                                        "Received a NestedLoopJoinExecNode message with JoinSide in Filter {}",
1584                                        i.side))
1585                                    )?;
1586
1587                                Ok(ColumnIndex {
1588                                    index: i.index as usize,
1589                                    side: side.into(),
1590                                })
1591                            })
1592                            .collect::<Result<Vec<_>>>()?;
1593
1594                        Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1595                    })
1596                    .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1597
1598        let projection = if !join.projection.is_empty() {
1599            Some(
1600                join.projection
1601                    .iter()
1602                    .map(|i| *i as usize)
1603                    .collect::<Vec<_>>(),
1604            )
1605        } else {
1606            None
1607        };
1608
1609        Ok(Arc::new(NestedLoopJoinExec::try_new(
1610            left,
1611            right,
1612            filter,
1613            &join_type.into(),
1614            projection,
1615        )?))
1616    }
1617
1618    fn try_into_analyze_physical_plan(
1619        &self,
1620        analyze: &protobuf::AnalyzeExecNode,
1621        registry: &dyn FunctionRegistry,
1622        runtime: &RuntimeEnv,
1623        extension_codec: &dyn PhysicalExtensionCodec,
1624    ) -> Result<Arc<dyn ExecutionPlan>> {
1625        let input: Arc<dyn ExecutionPlan> =
1626            into_physical_plan(&analyze.input, registry, runtime, extension_codec)?;
1627        Ok(Arc::new(AnalyzeExec::new(
1628            analyze.verbose,
1629            analyze.show_statistics,
1630            input,
1631            Arc::new(convert_required!(analyze.schema)?),
1632        )))
1633    }
1634
1635    fn try_into_json_sink_physical_plan(
1636        &self,
1637        sink: &protobuf::JsonSinkExecNode,
1638        registry: &dyn FunctionRegistry,
1639        runtime: &RuntimeEnv,
1640        extension_codec: &dyn PhysicalExtensionCodec,
1641    ) -> Result<Arc<dyn ExecutionPlan>> {
1642        let input = into_physical_plan(&sink.input, registry, runtime, extension_codec)?;
1643
1644        let data_sink: JsonSink = sink
1645            .sink
1646            .as_ref()
1647            .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1648            .try_into()?;
1649        let sink_schema = input.schema();
1650        let sort_order = sink
1651            .sort_order
1652            .as_ref()
1653            .map(|collection| {
1654                parse_physical_sort_exprs(
1655                    &collection.physical_sort_expr_nodes,
1656                    registry,
1657                    &sink_schema,
1658                    extension_codec,
1659                )
1660                .map(LexRequirement::from)
1661            })
1662            .transpose()?;
1663        Ok(Arc::new(DataSinkExec::new(
1664            input,
1665            Arc::new(data_sink),
1666            sort_order,
1667        )))
1668    }
1669
1670    fn try_into_csv_sink_physical_plan(
1671        &self,
1672        sink: &protobuf::CsvSinkExecNode,
1673        registry: &dyn FunctionRegistry,
1674        runtime: &RuntimeEnv,
1675        extension_codec: &dyn PhysicalExtensionCodec,
1676    ) -> Result<Arc<dyn ExecutionPlan>> {
1677        let input = into_physical_plan(&sink.input, registry, runtime, extension_codec)?;
1678
1679        let data_sink: CsvSink = sink
1680            .sink
1681            .as_ref()
1682            .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1683            .try_into()?;
1684        let sink_schema = input.schema();
1685        let sort_order = sink
1686            .sort_order
1687            .as_ref()
1688            .map(|collection| {
1689                parse_physical_sort_exprs(
1690                    &collection.physical_sort_expr_nodes,
1691                    registry,
1692                    &sink_schema,
1693                    extension_codec,
1694                )
1695                .map(LexRequirement::from)
1696            })
1697            .transpose()?;
1698        Ok(Arc::new(DataSinkExec::new(
1699            input,
1700            Arc::new(data_sink),
1701            sort_order,
1702        )))
1703    }
1704
1705    fn try_into_parquet_sink_physical_plan(
1706        &self,
1707        sink: &protobuf::ParquetSinkExecNode,
1708        registry: &dyn FunctionRegistry,
1709        runtime: &RuntimeEnv,
1710        extension_codec: &dyn PhysicalExtensionCodec,
1711    ) -> Result<Arc<dyn ExecutionPlan>> {
1712        #[cfg(feature = "parquet")]
1713        {
1714            let input =
1715                into_physical_plan(&sink.input, registry, runtime, extension_codec)?;
1716
1717            let data_sink: ParquetSink = sink
1718                .sink
1719                .as_ref()
1720                .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1721                .try_into()?;
1722            let sink_schema = input.schema();
1723            let sort_order = sink
1724                .sort_order
1725                .as_ref()
1726                .map(|collection| {
1727                    parse_physical_sort_exprs(
1728                        &collection.physical_sort_expr_nodes,
1729                        registry,
1730                        &sink_schema,
1731                        extension_codec,
1732                    )
1733                    .map(LexRequirement::from)
1734                })
1735                .transpose()?;
1736            Ok(Arc::new(DataSinkExec::new(
1737                input,
1738                Arc::new(data_sink),
1739                sort_order,
1740            )))
1741        }
1742        #[cfg(not(feature = "parquet"))]
1743        panic!("Trying to use ParquetSink without `parquet` feature enabled");
1744    }
1745
1746    fn try_into_unnest_physical_plan(
1747        &self,
1748        unnest: &protobuf::UnnestExecNode,
1749        registry: &dyn FunctionRegistry,
1750        runtime: &RuntimeEnv,
1751        extension_codec: &dyn PhysicalExtensionCodec,
1752    ) -> Result<Arc<dyn ExecutionPlan>> {
1753        let input =
1754            into_physical_plan(&unnest.input, registry, runtime, extension_codec)?;
1755
1756        Ok(Arc::new(UnnestExec::new(
1757            input,
1758            unnest
1759                .list_type_columns
1760                .iter()
1761                .map(|c| ListUnnest {
1762                    index_in_input_schema: c.index_in_input_schema as _,
1763                    depth: c.depth as _,
1764                })
1765                .collect(),
1766            unnest.struct_type_columns.iter().map(|c| *c as _).collect(),
1767            Arc::new(convert_required!(unnest.schema)?),
1768            into_required!(unnest.options)?,
1769        )))
1770    }
1771
1772    fn try_from_explain_exec(
1773        exec: &ExplainExec,
1774        _extension_codec: &dyn PhysicalExtensionCodec,
1775    ) -> Result<Self> {
1776        Ok(protobuf::PhysicalPlanNode {
1777            physical_plan_type: Some(PhysicalPlanType::Explain(
1778                protobuf::ExplainExecNode {
1779                    schema: Some(exec.schema().as_ref().try_into()?),
1780                    stringified_plans: exec
1781                        .stringified_plans()
1782                        .iter()
1783                        .map(|plan| plan.into())
1784                        .collect(),
1785                    verbose: exec.verbose(),
1786                },
1787            )),
1788        })
1789    }
1790
1791    fn try_from_projection_exec(
1792        exec: &ProjectionExec,
1793        extension_codec: &dyn PhysicalExtensionCodec,
1794    ) -> Result<Self> {
1795        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1796            exec.input().to_owned(),
1797            extension_codec,
1798        )?;
1799        let expr = exec
1800            .expr()
1801            .iter()
1802            .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
1803            .collect::<Result<Vec<_>>>()?;
1804        let expr_name = exec.expr().iter().map(|expr| expr.1.clone()).collect();
1805        Ok(protobuf::PhysicalPlanNode {
1806            physical_plan_type: Some(PhysicalPlanType::Projection(Box::new(
1807                protobuf::ProjectionExecNode {
1808                    input: Some(Box::new(input)),
1809                    expr,
1810                    expr_name,
1811                },
1812            ))),
1813        })
1814    }
1815
1816    fn try_from_analyze_exec(
1817        exec: &AnalyzeExec,
1818        extension_codec: &dyn PhysicalExtensionCodec,
1819    ) -> Result<Self> {
1820        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1821            exec.input().to_owned(),
1822            extension_codec,
1823        )?;
1824        Ok(protobuf::PhysicalPlanNode {
1825            physical_plan_type: Some(PhysicalPlanType::Analyze(Box::new(
1826                protobuf::AnalyzeExecNode {
1827                    verbose: exec.verbose(),
1828                    show_statistics: exec.show_statistics(),
1829                    input: Some(Box::new(input)),
1830                    schema: Some(exec.schema().as_ref().try_into()?),
1831                },
1832            ))),
1833        })
1834    }
1835
1836    fn try_from_filter_exec(
1837        exec: &FilterExec,
1838        extension_codec: &dyn PhysicalExtensionCodec,
1839    ) -> Result<Self> {
1840        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1841            exec.input().to_owned(),
1842            extension_codec,
1843        )?;
1844        Ok(protobuf::PhysicalPlanNode {
1845            physical_plan_type: Some(PhysicalPlanType::Filter(Box::new(
1846                protobuf::FilterExecNode {
1847                    input: Some(Box::new(input)),
1848                    expr: Some(serialize_physical_expr(
1849                        exec.predicate(),
1850                        extension_codec,
1851                    )?),
1852                    default_filter_selectivity: exec.default_selectivity() as u32,
1853                    projection: exec.projection().as_ref().map_or_else(Vec::new, |v| {
1854                        v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
1855                    }),
1856                },
1857            ))),
1858        })
1859    }
1860
1861    fn try_from_global_limit_exec(
1862        limit: &GlobalLimitExec,
1863        extension_codec: &dyn PhysicalExtensionCodec,
1864    ) -> Result<Self> {
1865        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1866            limit.input().to_owned(),
1867            extension_codec,
1868        )?;
1869
1870        Ok(protobuf::PhysicalPlanNode {
1871            physical_plan_type: Some(PhysicalPlanType::GlobalLimit(Box::new(
1872                protobuf::GlobalLimitExecNode {
1873                    input: Some(Box::new(input)),
1874                    skip: limit.skip() as u32,
1875                    fetch: match limit.fetch() {
1876                        Some(n) => n as i64,
1877                        _ => -1, // no limit
1878                    },
1879                },
1880            ))),
1881        })
1882    }
1883
1884    fn try_from_local_limit_exec(
1885        limit: &LocalLimitExec,
1886        extension_codec: &dyn PhysicalExtensionCodec,
1887    ) -> Result<Self> {
1888        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1889            limit.input().to_owned(),
1890            extension_codec,
1891        )?;
1892        Ok(protobuf::PhysicalPlanNode {
1893            physical_plan_type: Some(PhysicalPlanType::LocalLimit(Box::new(
1894                protobuf::LocalLimitExecNode {
1895                    input: Some(Box::new(input)),
1896                    fetch: limit.fetch() as u32,
1897                },
1898            ))),
1899        })
1900    }
1901
1902    fn try_from_hash_join_exec(
1903        exec: &HashJoinExec,
1904        extension_codec: &dyn PhysicalExtensionCodec,
1905    ) -> Result<Self> {
1906        let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
1907            exec.left().to_owned(),
1908            extension_codec,
1909        )?;
1910        let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
1911            exec.right().to_owned(),
1912            extension_codec,
1913        )?;
1914        let on: Vec<protobuf::JoinOn> = exec
1915            .on()
1916            .iter()
1917            .map(|tuple| {
1918                let l = serialize_physical_expr(&tuple.0, extension_codec)?;
1919                let r = serialize_physical_expr(&tuple.1, extension_codec)?;
1920                Ok::<_, DataFusionError>(protobuf::JoinOn {
1921                    left: Some(l),
1922                    right: Some(r),
1923                })
1924            })
1925            .collect::<Result<_>>()?;
1926        let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
1927        let filter = exec
1928            .filter()
1929            .as_ref()
1930            .map(|f| {
1931                let expression =
1932                    serialize_physical_expr(f.expression(), extension_codec)?;
1933                let column_indices = f
1934                    .column_indices()
1935                    .iter()
1936                    .map(|i| {
1937                        let side: protobuf::JoinSide = i.side.to_owned().into();
1938                        protobuf::ColumnIndex {
1939                            index: i.index as u32,
1940                            side: side.into(),
1941                        }
1942                    })
1943                    .collect();
1944                let schema = f.schema().as_ref().try_into()?;
1945                Ok(protobuf::JoinFilter {
1946                    expression: Some(expression),
1947                    column_indices,
1948                    schema: Some(schema),
1949                })
1950            })
1951            .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
1952
1953        let partition_mode = match exec.partition_mode() {
1954            PartitionMode::CollectLeft => protobuf::PartitionMode::CollectLeft,
1955            PartitionMode::Partitioned => protobuf::PartitionMode::Partitioned,
1956            PartitionMode::Auto => protobuf::PartitionMode::Auto,
1957        };
1958
1959        Ok(protobuf::PhysicalPlanNode {
1960            physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new(
1961                protobuf::HashJoinExecNode {
1962                    left: Some(Box::new(left)),
1963                    right: Some(Box::new(right)),
1964                    on,
1965                    join_type: join_type.into(),
1966                    partition_mode: partition_mode.into(),
1967                    null_equals_null: exec.null_equals_null(),
1968                    filter,
1969                    projection: exec.projection.as_ref().map_or_else(Vec::new, |v| {
1970                        v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
1971                    }),
1972                },
1973            ))),
1974        })
1975    }
1976
1977    fn try_from_symmetric_hash_join_exec(
1978        exec: &SymmetricHashJoinExec,
1979        extension_codec: &dyn PhysicalExtensionCodec,
1980    ) -> Result<Self> {
1981        let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
1982            exec.left().to_owned(),
1983            extension_codec,
1984        )?;
1985        let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
1986            exec.right().to_owned(),
1987            extension_codec,
1988        )?;
1989        let on = exec
1990            .on()
1991            .iter()
1992            .map(|tuple| {
1993                let l = serialize_physical_expr(&tuple.0, extension_codec)?;
1994                let r = serialize_physical_expr(&tuple.1, extension_codec)?;
1995                Ok::<_, DataFusionError>(protobuf::JoinOn {
1996                    left: Some(l),
1997                    right: Some(r),
1998                })
1999            })
2000            .collect::<Result<_>>()?;
2001        let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2002        let filter = exec
2003            .filter()
2004            .as_ref()
2005            .map(|f| {
2006                let expression =
2007                    serialize_physical_expr(f.expression(), extension_codec)?;
2008                let column_indices = f
2009                    .column_indices()
2010                    .iter()
2011                    .map(|i| {
2012                        let side: protobuf::JoinSide = i.side.to_owned().into();
2013                        protobuf::ColumnIndex {
2014                            index: i.index as u32,
2015                            side: side.into(),
2016                        }
2017                    })
2018                    .collect();
2019                let schema = f.schema().as_ref().try_into()?;
2020                Ok(protobuf::JoinFilter {
2021                    expression: Some(expression),
2022                    column_indices,
2023                    schema: Some(schema),
2024                })
2025            })
2026            .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2027
2028        let partition_mode = match exec.partition_mode() {
2029            StreamJoinPartitionMode::SinglePartition => {
2030                protobuf::StreamPartitionMode::SinglePartition
2031            }
2032            StreamJoinPartitionMode::Partitioned => {
2033                protobuf::StreamPartitionMode::PartitionedExec
2034            }
2035        };
2036
2037        let left_sort_exprs = exec
2038            .left_sort_exprs()
2039            .map(|exprs| {
2040                exprs
2041                    .iter()
2042                    .map(|expr| {
2043                        Ok(protobuf::PhysicalSortExprNode {
2044                            expr: Some(Box::new(serialize_physical_expr(
2045                                &expr.expr,
2046                                extension_codec,
2047                            )?)),
2048                            asc: !expr.options.descending,
2049                            nulls_first: expr.options.nulls_first,
2050                        })
2051                    })
2052                    .collect::<Result<Vec<_>>>()
2053            })
2054            .transpose()?
2055            .unwrap_or(vec![]);
2056
2057        let right_sort_exprs = exec
2058            .right_sort_exprs()
2059            .map(|exprs| {
2060                exprs
2061                    .iter()
2062                    .map(|expr| {
2063                        Ok(protobuf::PhysicalSortExprNode {
2064                            expr: Some(Box::new(serialize_physical_expr(
2065                                &expr.expr,
2066                                extension_codec,
2067                            )?)),
2068                            asc: !expr.options.descending,
2069                            nulls_first: expr.options.nulls_first,
2070                        })
2071                    })
2072                    .collect::<Result<Vec<_>>>()
2073            })
2074            .transpose()?
2075            .unwrap_or(vec![]);
2076
2077        Ok(protobuf::PhysicalPlanNode {
2078            physical_plan_type: Some(PhysicalPlanType::SymmetricHashJoin(Box::new(
2079                protobuf::SymmetricHashJoinExecNode {
2080                    left: Some(Box::new(left)),
2081                    right: Some(Box::new(right)),
2082                    on,
2083                    join_type: join_type.into(),
2084                    partition_mode: partition_mode.into(),
2085                    null_equals_null: exec.null_equals_null(),
2086                    left_sort_exprs,
2087                    right_sort_exprs,
2088                    filter,
2089                },
2090            ))),
2091        })
2092    }
2093
2094    fn try_from_cross_join_exec(
2095        exec: &CrossJoinExec,
2096        extension_codec: &dyn PhysicalExtensionCodec,
2097    ) -> Result<Self> {
2098        let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2099            exec.left().to_owned(),
2100            extension_codec,
2101        )?;
2102        let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2103            exec.right().to_owned(),
2104            extension_codec,
2105        )?;
2106        Ok(protobuf::PhysicalPlanNode {
2107            physical_plan_type: Some(PhysicalPlanType::CrossJoin(Box::new(
2108                protobuf::CrossJoinExecNode {
2109                    left: Some(Box::new(left)),
2110                    right: Some(Box::new(right)),
2111                },
2112            ))),
2113        })
2114    }
2115
2116    fn try_from_aggregate_exec(
2117        exec: &AggregateExec,
2118        extension_codec: &dyn PhysicalExtensionCodec,
2119    ) -> Result<Self> {
2120        let groups: Vec<bool> = exec
2121            .group_expr()
2122            .groups()
2123            .iter()
2124            .flatten()
2125            .copied()
2126            .collect();
2127
2128        let group_names = exec
2129            .group_expr()
2130            .expr()
2131            .iter()
2132            .map(|expr| expr.1.to_owned())
2133            .collect();
2134
2135        let filter = exec
2136            .filter_expr()
2137            .iter()
2138            .map(|expr| serialize_maybe_filter(expr.to_owned(), extension_codec))
2139            .collect::<Result<Vec<_>>>()?;
2140
2141        let agg = exec
2142            .aggr_expr()
2143            .iter()
2144            .map(|expr| serialize_physical_aggr_expr(expr.to_owned(), extension_codec))
2145            .collect::<Result<Vec<_>>>()?;
2146
2147        let agg_names = exec
2148            .aggr_expr()
2149            .iter()
2150            .map(|expr| expr.name().to_string())
2151            .collect::<Vec<_>>();
2152
2153        let agg_mode = match exec.mode() {
2154            AggregateMode::Partial => protobuf::AggregateMode::Partial,
2155            AggregateMode::Final => protobuf::AggregateMode::Final,
2156            AggregateMode::FinalPartitioned => protobuf::AggregateMode::FinalPartitioned,
2157            AggregateMode::Single => protobuf::AggregateMode::Single,
2158            AggregateMode::SinglePartitioned => {
2159                protobuf::AggregateMode::SinglePartitioned
2160            }
2161        };
2162        let input_schema = exec.input_schema();
2163        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2164            exec.input().to_owned(),
2165            extension_codec,
2166        )?;
2167
2168        let null_expr = exec
2169            .group_expr()
2170            .null_expr()
2171            .iter()
2172            .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
2173            .collect::<Result<Vec<_>>>()?;
2174
2175        let group_expr = exec
2176            .group_expr()
2177            .expr()
2178            .iter()
2179            .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
2180            .collect::<Result<Vec<_>>>()?;
2181
2182        let limit = exec.limit().map(|value| protobuf::AggLimit {
2183            limit: value as u64,
2184        });
2185
2186        Ok(protobuf::PhysicalPlanNode {
2187            physical_plan_type: Some(PhysicalPlanType::Aggregate(Box::new(
2188                protobuf::AggregateExecNode {
2189                    group_expr,
2190                    group_expr_name: group_names,
2191                    aggr_expr: agg,
2192                    filter_expr: filter,
2193                    aggr_expr_name: agg_names,
2194                    mode: agg_mode as i32,
2195                    input: Some(Box::new(input)),
2196                    input_schema: Some(input_schema.as_ref().try_into()?),
2197                    null_expr,
2198                    groups,
2199                    limit,
2200                },
2201            ))),
2202        })
2203    }
2204
2205    fn try_from_empty_exec(
2206        empty: &EmptyExec,
2207        _extension_codec: &dyn PhysicalExtensionCodec,
2208    ) -> Result<Self> {
2209        let schema = empty.schema().as_ref().try_into()?;
2210        Ok(protobuf::PhysicalPlanNode {
2211            physical_plan_type: Some(PhysicalPlanType::Empty(protobuf::EmptyExecNode {
2212                schema: Some(schema),
2213            })),
2214        })
2215    }
2216
2217    fn try_from_placeholder_row_exec(
2218        empty: &PlaceholderRowExec,
2219        _extension_codec: &dyn PhysicalExtensionCodec,
2220    ) -> Result<Self> {
2221        let schema = empty.schema().as_ref().try_into()?;
2222        Ok(protobuf::PhysicalPlanNode {
2223            physical_plan_type: Some(PhysicalPlanType::PlaceholderRow(
2224                protobuf::PlaceholderRowExecNode {
2225                    schema: Some(schema),
2226                },
2227            )),
2228        })
2229    }
2230
2231    fn try_from_coalesce_batches_exec(
2232        coalesce_batches: &CoalesceBatchesExec,
2233        extension_codec: &dyn PhysicalExtensionCodec,
2234    ) -> Result<Self> {
2235        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2236            coalesce_batches.input().to_owned(),
2237            extension_codec,
2238        )?;
2239        Ok(protobuf::PhysicalPlanNode {
2240            physical_plan_type: Some(PhysicalPlanType::CoalesceBatches(Box::new(
2241                protobuf::CoalesceBatchesExecNode {
2242                    input: Some(Box::new(input)),
2243                    target_batch_size: coalesce_batches.target_batch_size() as u32,
2244                    fetch: coalesce_batches.fetch().map(|n| n as u32),
2245                },
2246            ))),
2247        })
2248    }
2249
2250    fn try_from_data_source_exec(
2251        data_source_exec: &DataSourceExec,
2252        extension_codec: &dyn PhysicalExtensionCodec,
2253    ) -> Result<Option<Self>> {
2254        let data_source = data_source_exec.data_source();
2255        if let Some(maybe_csv) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2256            let source = maybe_csv.file_source();
2257            if let Some(csv_config) = source.as_any().downcast_ref::<CsvSource>() {
2258                return Ok(Some(protobuf::PhysicalPlanNode {
2259                    physical_plan_type: Some(PhysicalPlanType::CsvScan(
2260                        protobuf::CsvScanExecNode {
2261                            base_conf: Some(serialize_file_scan_config(
2262                                maybe_csv,
2263                                extension_codec,
2264                            )?),
2265                            has_header: csv_config.has_header(),
2266                            delimiter: byte_to_string(
2267                                csv_config.delimiter(),
2268                                "delimiter",
2269                            )?,
2270                            quote: byte_to_string(csv_config.quote(), "quote")?,
2271                            optional_escape: if let Some(escape) = csv_config.escape() {
2272                                Some(
2273                                    protobuf::csv_scan_exec_node::OptionalEscape::Escape(
2274                                        byte_to_string(escape, "escape")?,
2275                                    ),
2276                                )
2277                            } else {
2278                                None
2279                            },
2280                            optional_comment: if let Some(comment) = csv_config.comment()
2281                            {
2282                                Some(protobuf::csv_scan_exec_node::OptionalComment::Comment(
2283                                        byte_to_string(comment, "comment")?,
2284                                    ))
2285                            } else {
2286                                None
2287                            },
2288                            newlines_in_values: maybe_csv.newlines_in_values(),
2289                        },
2290                    )),
2291                }));
2292            }
2293        }
2294
2295        if let Some(scan_conf) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2296            let source = scan_conf.file_source();
2297            if let Some(_json_source) = source.as_any().downcast_ref::<JsonSource>() {
2298                return Ok(Some(protobuf::PhysicalPlanNode {
2299                    physical_plan_type: Some(PhysicalPlanType::JsonScan(
2300                        protobuf::JsonScanExecNode {
2301                            base_conf: Some(serialize_file_scan_config(
2302                                scan_conf,
2303                                extension_codec,
2304                            )?),
2305                        },
2306                    )),
2307                }));
2308            }
2309        }
2310
2311        #[cfg(feature = "parquet")]
2312        if let Some((maybe_parquet, conf)) =
2313            data_source_exec.downcast_to_file_source::<ParquetSource>()
2314        {
2315            let predicate = conf
2316                .predicate()
2317                .map(|pred| serialize_physical_expr(pred, extension_codec))
2318                .transpose()?;
2319            return Ok(Some(protobuf::PhysicalPlanNode {
2320                physical_plan_type: Some(PhysicalPlanType::ParquetScan(
2321                    protobuf::ParquetScanExecNode {
2322                        base_conf: Some(serialize_file_scan_config(
2323                            maybe_parquet,
2324                            extension_codec,
2325                        )?),
2326                        predicate,
2327                        parquet_options: Some(conf.table_parquet_options().try_into()?),
2328                    },
2329                )),
2330            }));
2331        }
2332
2333        #[cfg(feature = "avro")]
2334        if let Some(maybe_avro) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2335            let source = maybe_avro.file_source();
2336            if source.as_any().downcast_ref::<AvroSource>().is_some() {
2337                return Ok(Some(protobuf::PhysicalPlanNode {
2338                    physical_plan_type: Some(PhysicalPlanType::AvroScan(
2339                        protobuf::AvroScanExecNode {
2340                            base_conf: Some(serialize_file_scan_config(
2341                                maybe_avro,
2342                                extension_codec,
2343                            )?),
2344                        },
2345                    )),
2346                }));
2347            }
2348        }
2349
2350        Ok(None)
2351    }
2352
2353    fn try_from_coalesce_partitions_exec(
2354        exec: &CoalescePartitionsExec,
2355        extension_codec: &dyn PhysicalExtensionCodec,
2356    ) -> Result<Self> {
2357        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2358            exec.input().to_owned(),
2359            extension_codec,
2360        )?;
2361        Ok(protobuf::PhysicalPlanNode {
2362            physical_plan_type: Some(PhysicalPlanType::Merge(Box::new(
2363                protobuf::CoalescePartitionsExecNode {
2364                    input: Some(Box::new(input)),
2365                    fetch: exec.fetch().map(|f| f as u32),
2366                },
2367            ))),
2368        })
2369    }
2370
2371    fn try_from_repartition_exec(
2372        exec: &RepartitionExec,
2373        extension_codec: &dyn PhysicalExtensionCodec,
2374    ) -> Result<Self> {
2375        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2376            exec.input().to_owned(),
2377            extension_codec,
2378        )?;
2379
2380        let pb_partitioning =
2381            serialize_partitioning(exec.partitioning(), extension_codec)?;
2382
2383        Ok(protobuf::PhysicalPlanNode {
2384            physical_plan_type: Some(PhysicalPlanType::Repartition(Box::new(
2385                protobuf::RepartitionExecNode {
2386                    input: Some(Box::new(input)),
2387                    partitioning: Some(pb_partitioning),
2388                },
2389            ))),
2390        })
2391    }
2392
2393    fn try_from_sort_exec(
2394        exec: &SortExec,
2395        extension_codec: &dyn PhysicalExtensionCodec,
2396    ) -> Result<Self> {
2397        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2398            exec.input().to_owned(),
2399            extension_codec,
2400        )?;
2401        let expr = exec
2402            .expr()
2403            .iter()
2404            .map(|expr| {
2405                let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
2406                    expr: Some(Box::new(serialize_physical_expr(
2407                        &expr.expr,
2408                        extension_codec,
2409                    )?)),
2410                    asc: !expr.options.descending,
2411                    nulls_first: expr.options.nulls_first,
2412                });
2413                Ok(protobuf::PhysicalExprNode {
2414                    expr_type: Some(ExprType::Sort(sort_expr)),
2415                })
2416            })
2417            .collect::<Result<Vec<_>>>()?;
2418        Ok(protobuf::PhysicalPlanNode {
2419            physical_plan_type: Some(PhysicalPlanType::Sort(Box::new(
2420                protobuf::SortExecNode {
2421                    input: Some(Box::new(input)),
2422                    expr,
2423                    fetch: match exec.fetch() {
2424                        Some(n) => n as i64,
2425                        _ => -1,
2426                    },
2427                    preserve_partitioning: exec.preserve_partitioning(),
2428                },
2429            ))),
2430        })
2431    }
2432
2433    fn try_from_union_exec(
2434        union: &UnionExec,
2435        extension_codec: &dyn PhysicalExtensionCodec,
2436    ) -> Result<Self> {
2437        let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
2438        for input in union.inputs() {
2439            inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
2440                input.to_owned(),
2441                extension_codec,
2442            )?);
2443        }
2444        Ok(protobuf::PhysicalPlanNode {
2445            physical_plan_type: Some(PhysicalPlanType::Union(protobuf::UnionExecNode {
2446                inputs,
2447            })),
2448        })
2449    }
2450
2451    fn try_from_interleave_exec(
2452        interleave: &InterleaveExec,
2453        extension_codec: &dyn PhysicalExtensionCodec,
2454    ) -> Result<Self> {
2455        let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
2456        for input in interleave.inputs() {
2457            inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
2458                input.to_owned(),
2459                extension_codec,
2460            )?);
2461        }
2462        Ok(protobuf::PhysicalPlanNode {
2463            physical_plan_type: Some(PhysicalPlanType::Interleave(
2464                protobuf::InterleaveExecNode { inputs },
2465            )),
2466        })
2467    }
2468
2469    fn try_from_sort_preserving_merge_exec(
2470        exec: &SortPreservingMergeExec,
2471        extension_codec: &dyn PhysicalExtensionCodec,
2472    ) -> Result<Self> {
2473        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2474            exec.input().to_owned(),
2475            extension_codec,
2476        )?;
2477        let expr = exec
2478            .expr()
2479            .iter()
2480            .map(|expr| {
2481                let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
2482                    expr: Some(Box::new(serialize_physical_expr(
2483                        &expr.expr,
2484                        extension_codec,
2485                    )?)),
2486                    asc: !expr.options.descending,
2487                    nulls_first: expr.options.nulls_first,
2488                });
2489                Ok(protobuf::PhysicalExprNode {
2490                    expr_type: Some(ExprType::Sort(sort_expr)),
2491                })
2492            })
2493            .collect::<Result<Vec<_>>>()?;
2494        Ok(protobuf::PhysicalPlanNode {
2495            physical_plan_type: Some(PhysicalPlanType::SortPreservingMerge(Box::new(
2496                protobuf::SortPreservingMergeExecNode {
2497                    input: Some(Box::new(input)),
2498                    expr,
2499                    fetch: exec.fetch().map(|f| f as i64).unwrap_or(-1),
2500                },
2501            ))),
2502        })
2503    }
2504
2505    fn try_from_nested_loop_join_exec(
2506        exec: &NestedLoopJoinExec,
2507        extension_codec: &dyn PhysicalExtensionCodec,
2508    ) -> Result<Self> {
2509        let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2510            exec.left().to_owned(),
2511            extension_codec,
2512        )?;
2513        let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2514            exec.right().to_owned(),
2515            extension_codec,
2516        )?;
2517
2518        let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2519        let filter = exec
2520            .filter()
2521            .as_ref()
2522            .map(|f| {
2523                let expression =
2524                    serialize_physical_expr(f.expression(), extension_codec)?;
2525                let column_indices = f
2526                    .column_indices()
2527                    .iter()
2528                    .map(|i| {
2529                        let side: protobuf::JoinSide = i.side.to_owned().into();
2530                        protobuf::ColumnIndex {
2531                            index: i.index as u32,
2532                            side: side.into(),
2533                        }
2534                    })
2535                    .collect();
2536                let schema = f.schema().as_ref().try_into()?;
2537                Ok(protobuf::JoinFilter {
2538                    expression: Some(expression),
2539                    column_indices,
2540                    schema: Some(schema),
2541                })
2542            })
2543            .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2544
2545        Ok(protobuf::PhysicalPlanNode {
2546            physical_plan_type: Some(PhysicalPlanType::NestedLoopJoin(Box::new(
2547                protobuf::NestedLoopJoinExecNode {
2548                    left: Some(Box::new(left)),
2549                    right: Some(Box::new(right)),
2550                    join_type: join_type.into(),
2551                    filter,
2552                    projection: exec.projection().map_or_else(Vec::new, |v| {
2553                        v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2554                    }),
2555                },
2556            ))),
2557        })
2558    }
2559
2560    fn try_from_window_agg_exec(
2561        exec: &WindowAggExec,
2562        extension_codec: &dyn PhysicalExtensionCodec,
2563    ) -> Result<Self> {
2564        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2565            exec.input().to_owned(),
2566            extension_codec,
2567        )?;
2568
2569        let window_expr = exec
2570            .window_expr()
2571            .iter()
2572            .map(|e| serialize_physical_window_expr(e, extension_codec))
2573            .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
2574
2575        let partition_keys = exec
2576            .partition_keys()
2577            .iter()
2578            .map(|e| serialize_physical_expr(e, extension_codec))
2579            .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
2580
2581        Ok(protobuf::PhysicalPlanNode {
2582            physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
2583                protobuf::WindowAggExecNode {
2584                    input: Some(Box::new(input)),
2585                    window_expr,
2586                    partition_keys,
2587                    input_order_mode: None,
2588                },
2589            ))),
2590        })
2591    }
2592
2593    fn try_from_bounded_window_agg_exec(
2594        exec: &BoundedWindowAggExec,
2595        extension_codec: &dyn PhysicalExtensionCodec,
2596    ) -> Result<Self> {
2597        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2598            exec.input().to_owned(),
2599            extension_codec,
2600        )?;
2601
2602        let window_expr = exec
2603            .window_expr()
2604            .iter()
2605            .map(|e| serialize_physical_window_expr(e, extension_codec))
2606            .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
2607
2608        let partition_keys = exec
2609            .partition_keys()
2610            .iter()
2611            .map(|e| serialize_physical_expr(e, extension_codec))
2612            .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
2613
2614        let input_order_mode = match &exec.input_order_mode {
2615            InputOrderMode::Linear => {
2616                window_agg_exec_node::InputOrderMode::Linear(protobuf::EmptyMessage {})
2617            }
2618            InputOrderMode::PartiallySorted(columns) => {
2619                window_agg_exec_node::InputOrderMode::PartiallySorted(
2620                    protobuf::PartiallySortedInputOrderMode {
2621                        columns: columns.iter().map(|c| *c as u64).collect(),
2622                    },
2623                )
2624            }
2625            InputOrderMode::Sorted => {
2626                window_agg_exec_node::InputOrderMode::Sorted(protobuf::EmptyMessage {})
2627            }
2628        };
2629
2630        Ok(protobuf::PhysicalPlanNode {
2631            physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
2632                protobuf::WindowAggExecNode {
2633                    input: Some(Box::new(input)),
2634                    window_expr,
2635                    partition_keys,
2636                    input_order_mode: Some(input_order_mode),
2637                },
2638            ))),
2639        })
2640    }
2641
2642    fn try_from_data_sink_exec(
2643        exec: &DataSinkExec,
2644        extension_codec: &dyn PhysicalExtensionCodec,
2645    ) -> Result<Option<Self>> {
2646        let input: protobuf::PhysicalPlanNode =
2647            protobuf::PhysicalPlanNode::try_from_physical_plan(
2648                exec.input().to_owned(),
2649                extension_codec,
2650            )?;
2651        let sort_order = match exec.sort_order() {
2652            Some(requirements) => {
2653                let expr = requirements
2654                    .iter()
2655                    .map(|requirement| {
2656                        let expr: PhysicalSortExpr = requirement.to_owned().into();
2657                        let sort_expr = protobuf::PhysicalSortExprNode {
2658                            expr: Some(Box::new(serialize_physical_expr(
2659                                &expr.expr,
2660                                extension_codec,
2661                            )?)),
2662                            asc: !expr.options.descending,
2663                            nulls_first: expr.options.nulls_first,
2664                        };
2665                        Ok(sort_expr)
2666                    })
2667                    .collect::<Result<Vec<_>>>()?;
2668                Some(protobuf::PhysicalSortExprNodeCollection {
2669                    physical_sort_expr_nodes: expr,
2670                })
2671            }
2672            None => None,
2673        };
2674
2675        if let Some(sink) = exec.sink().as_any().downcast_ref::<JsonSink>() {
2676            return Ok(Some(protobuf::PhysicalPlanNode {
2677                physical_plan_type: Some(PhysicalPlanType::JsonSink(Box::new(
2678                    protobuf::JsonSinkExecNode {
2679                        input: Some(Box::new(input)),
2680                        sink: Some(sink.try_into()?),
2681                        sink_schema: Some(exec.schema().as_ref().try_into()?),
2682                        sort_order,
2683                    },
2684                ))),
2685            }));
2686        }
2687
2688        if let Some(sink) = exec.sink().as_any().downcast_ref::<CsvSink>() {
2689            return Ok(Some(protobuf::PhysicalPlanNode {
2690                physical_plan_type: Some(PhysicalPlanType::CsvSink(Box::new(
2691                    protobuf::CsvSinkExecNode {
2692                        input: Some(Box::new(input)),
2693                        sink: Some(sink.try_into()?),
2694                        sink_schema: Some(exec.schema().as_ref().try_into()?),
2695                        sort_order,
2696                    },
2697                ))),
2698            }));
2699        }
2700
2701        #[cfg(feature = "parquet")]
2702        if let Some(sink) = exec.sink().as_any().downcast_ref::<ParquetSink>() {
2703            return Ok(Some(protobuf::PhysicalPlanNode {
2704                physical_plan_type: Some(PhysicalPlanType::ParquetSink(Box::new(
2705                    protobuf::ParquetSinkExecNode {
2706                        input: Some(Box::new(input)),
2707                        sink: Some(sink.try_into()?),
2708                        sink_schema: Some(exec.schema().as_ref().try_into()?),
2709                        sort_order,
2710                    },
2711                ))),
2712            }));
2713        }
2714
2715        // If unknown DataSink then let extension handle it
2716        Ok(None)
2717    }
2718
2719    fn try_from_unnest_exec(
2720        exec: &UnnestExec,
2721        extension_codec: &dyn PhysicalExtensionCodec,
2722    ) -> Result<Self> {
2723        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2724            exec.input().to_owned(),
2725            extension_codec,
2726        )?;
2727
2728        Ok(protobuf::PhysicalPlanNode {
2729            physical_plan_type: Some(PhysicalPlanType::Unnest(Box::new(
2730                protobuf::UnnestExecNode {
2731                    input: Some(Box::new(input)),
2732                    schema: Some(exec.schema().try_into()?),
2733                    list_type_columns: exec
2734                        .list_column_indices()
2735                        .iter()
2736                        .map(|c| ProtoListUnnest {
2737                            index_in_input_schema: c.index_in_input_schema as _,
2738                            depth: c.depth as _,
2739                        })
2740                        .collect(),
2741                    struct_type_columns: exec
2742                        .struct_column_indices()
2743                        .iter()
2744                        .map(|c| *c as _)
2745                        .collect(),
2746                    options: Some(exec.options().into()),
2747                },
2748            ))),
2749        })
2750    }
2751}
2752
2753pub trait AsExecutionPlan: Debug + Send + Sync + Clone {
2754    fn try_decode(buf: &[u8]) -> Result<Self>
2755    where
2756        Self: Sized;
2757
2758    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
2759    where
2760        B: BufMut,
2761        Self: Sized;
2762
2763    fn try_into_physical_plan(
2764        &self,
2765        registry: &dyn FunctionRegistry,
2766        runtime: &RuntimeEnv,
2767        extension_codec: &dyn PhysicalExtensionCodec,
2768    ) -> Result<Arc<dyn ExecutionPlan>>;
2769
2770    fn try_from_physical_plan(
2771        plan: Arc<dyn ExecutionPlan>,
2772        extension_codec: &dyn PhysicalExtensionCodec,
2773    ) -> Result<Self>
2774    where
2775        Self: Sized;
2776}
2777
2778pub trait PhysicalExtensionCodec: Debug + Send + Sync {
2779    fn try_decode(
2780        &self,
2781        buf: &[u8],
2782        inputs: &[Arc<dyn ExecutionPlan>],
2783        registry: &dyn FunctionRegistry,
2784    ) -> Result<Arc<dyn ExecutionPlan>>;
2785
2786    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()>;
2787
2788    fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
2789        not_impl_err!("PhysicalExtensionCodec is not provided for scalar function {name}")
2790    }
2791
2792    fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
2793        Ok(())
2794    }
2795
2796    fn try_decode_expr(
2797        &self,
2798        _buf: &[u8],
2799        _inputs: &[Arc<dyn PhysicalExpr>],
2800    ) -> Result<Arc<dyn PhysicalExpr>> {
2801        not_impl_err!("PhysicalExtensionCodec is not provided")
2802    }
2803
2804    fn try_encode_expr(
2805        &self,
2806        _node: &Arc<dyn PhysicalExpr>,
2807        _buf: &mut Vec<u8>,
2808    ) -> Result<()> {
2809        not_impl_err!("PhysicalExtensionCodec is not provided")
2810    }
2811
2812    fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
2813        not_impl_err!(
2814            "PhysicalExtensionCodec is not provided for aggregate function {name}"
2815        )
2816    }
2817
2818    fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
2819        Ok(())
2820    }
2821
2822    fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
2823        not_impl_err!("PhysicalExtensionCodec is not provided for window function {name}")
2824    }
2825
2826    fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
2827        Ok(())
2828    }
2829}
2830
2831#[derive(Debug)]
2832pub struct DefaultPhysicalExtensionCodec {}
2833
2834impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec {
2835    fn try_decode(
2836        &self,
2837        _buf: &[u8],
2838        _inputs: &[Arc<dyn ExecutionPlan>],
2839        _registry: &dyn FunctionRegistry,
2840    ) -> Result<Arc<dyn ExecutionPlan>> {
2841        not_impl_err!("PhysicalExtensionCodec is not provided")
2842    }
2843
2844    fn try_encode(
2845        &self,
2846        _node: Arc<dyn ExecutionPlan>,
2847        _buf: &mut Vec<u8>,
2848    ) -> Result<()> {
2849        not_impl_err!("PhysicalExtensionCodec is not provided")
2850    }
2851}
2852
2853fn into_physical_plan(
2854    node: &Option<Box<protobuf::PhysicalPlanNode>>,
2855    registry: &dyn FunctionRegistry,
2856    runtime: &RuntimeEnv,
2857    extension_codec: &dyn PhysicalExtensionCodec,
2858) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
2859    if let Some(field) = node {
2860        field.try_into_physical_plan(registry, runtime, extension_codec)
2861    } else {
2862        Err(proto_error("Missing required field in protobuf"))
2863    }
2864}