datafusion_proto/physical_plan/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use self::from_proto::parse_protobuf_partitioning;
22use self::to_proto::{serialize_partitioning, serialize_physical_expr};
23use crate::common::{byte_to_string, str_to_byte};
24use crate::physical_plan::from_proto::{
25    parse_physical_expr, parse_physical_sort_expr, parse_physical_sort_exprs,
26    parse_physical_window_expr, parse_protobuf_file_scan_config,
27    parse_protobuf_file_scan_schema,
28};
29use crate::physical_plan::to_proto::{
30    serialize_file_scan_config, serialize_maybe_filter, serialize_physical_aggr_expr,
31    serialize_physical_window_expr,
32};
33use crate::protobuf::physical_aggregate_expr_node::AggregateFunction;
34use crate::protobuf::physical_expr_node::ExprType;
35use crate::protobuf::physical_plan_node::PhysicalPlanType;
36use crate::protobuf::{
37    self, proto_error, window_agg_exec_node, ListUnnest as ProtoListUnnest,
38};
39use crate::{convert_required, into_required};
40
41use datafusion::arrow::compute::SortOptions;
42use datafusion::arrow::datatypes::{Schema, SchemaRef};
43use datafusion::datasource::file_format::csv::CsvSink;
44use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
45use datafusion::datasource::file_format::json::JsonSink;
46#[cfg(feature = "parquet")]
47use datafusion::datasource::file_format::parquet::ParquetSink;
48#[cfg(feature = "avro")]
49use datafusion::datasource::physical_plan::AvroSource;
50#[cfg(feature = "parquet")]
51use datafusion::datasource::physical_plan::ParquetSource;
52use datafusion::datasource::physical_plan::{
53    CsvSource, FileScanConfig, FileScanConfigBuilder, JsonSource,
54};
55use datafusion::datasource::sink::DataSinkExec;
56use datafusion::datasource::source::DataSourceExec;
57use datafusion::execution::runtime_env::RuntimeEnv;
58use datafusion::execution::FunctionRegistry;
59use datafusion::physical_expr::aggregate::AggregateExprBuilder;
60use datafusion::physical_expr::aggregate::AggregateFunctionExpr;
61use datafusion::physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef};
62use datafusion::physical_plan::aggregates::AggregateMode;
63use datafusion::physical_plan::aggregates::{AggregateExec, PhysicalGroupBy};
64use datafusion::physical_plan::analyze::AnalyzeExec;
65use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
66use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
67use datafusion::physical_plan::coop::CooperativeExec;
68use datafusion::physical_plan::empty::EmptyExec;
69use datafusion::physical_plan::explain::ExplainExec;
70use datafusion::physical_plan::expressions::PhysicalSortExpr;
71use datafusion::physical_plan::filter::FilterExec;
72use datafusion::physical_plan::joins::utils::{ColumnIndex, JoinFilter};
73use datafusion::physical_plan::joins::{
74    CrossJoinExec, NestedLoopJoinExec, StreamJoinPartitionMode, SymmetricHashJoinExec,
75};
76use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
77use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
78use datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
79use datafusion::physical_plan::projection::ProjectionExec;
80use datafusion::physical_plan::repartition::RepartitionExec;
81use datafusion::physical_plan::sorts::sort::SortExec;
82use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
83use datafusion::physical_plan::union::{InterleaveExec, UnionExec};
84use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec};
85use datafusion::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
86use datafusion::physical_plan::{
87    ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr,
88};
89use datafusion_common::config::TableParquetOptions;
90use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
91use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
92
93use prost::bytes::BufMut;
94use prost::Message;
95
96pub mod from_proto;
97pub mod to_proto;
98
99impl AsExecutionPlan for protobuf::PhysicalPlanNode {
100    fn try_decode(buf: &[u8]) -> Result<Self>
101    where
102        Self: Sized,
103    {
104        protobuf::PhysicalPlanNode::decode(buf).map_err(|e| {
105            DataFusionError::Internal(format!("failed to decode physical plan: {e:?}"))
106        })
107    }
108
109    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
110    where
111        B: BufMut,
112        Self: Sized,
113    {
114        self.encode(buf).map_err(|e| {
115            DataFusionError::Internal(format!("failed to encode physical plan: {e:?}"))
116        })
117    }
118
119    fn try_into_physical_plan(
120        &self,
121        registry: &dyn FunctionRegistry,
122        runtime: &RuntimeEnv,
123        extension_codec: &dyn PhysicalExtensionCodec,
124    ) -> Result<Arc<dyn ExecutionPlan>> {
125        let plan = self.physical_plan_type.as_ref().ok_or_else(|| {
126            proto_error(format!(
127                "physical_plan::from_proto() Unsupported physical plan '{self:?}'"
128            ))
129        })?;
130        match plan {
131            PhysicalPlanType::Explain(explain) => self.try_into_explain_physical_plan(
132                explain,
133                registry,
134                runtime,
135                extension_codec,
136            ),
137            PhysicalPlanType::Projection(projection) => self
138                .try_into_projection_physical_plan(
139                    projection,
140                    registry,
141                    runtime,
142                    extension_codec,
143                ),
144            PhysicalPlanType::Filter(filter) => self.try_into_filter_physical_plan(
145                filter,
146                registry,
147                runtime,
148                extension_codec,
149            ),
150            PhysicalPlanType::CsvScan(scan) => self.try_into_csv_scan_physical_plan(
151                scan,
152                registry,
153                runtime,
154                extension_codec,
155            ),
156            PhysicalPlanType::JsonScan(scan) => self.try_into_json_scan_physical_plan(
157                scan,
158                registry,
159                runtime,
160                extension_codec,
161            ),
162            #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
163            PhysicalPlanType::ParquetScan(scan) => self
164                .try_into_parquet_scan_physical_plan(
165                    scan,
166                    registry,
167                    runtime,
168                    extension_codec,
169                ),
170            #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
171            PhysicalPlanType::AvroScan(scan) => self.try_into_avro_scan_physical_plan(
172                scan,
173                registry,
174                runtime,
175                extension_codec,
176            ),
177            PhysicalPlanType::CoalesceBatches(coalesce_batches) => self
178                .try_into_coalesce_batches_physical_plan(
179                    coalesce_batches,
180                    registry,
181                    runtime,
182                    extension_codec,
183                ),
184            PhysicalPlanType::Merge(merge) => self.try_into_merge_physical_plan(
185                merge,
186                registry,
187                runtime,
188                extension_codec,
189            ),
190            PhysicalPlanType::Repartition(repart) => self
191                .try_into_repartition_physical_plan(
192                    repart,
193                    registry,
194                    runtime,
195                    extension_codec,
196                ),
197            PhysicalPlanType::GlobalLimit(limit) => self
198                .try_into_global_limit_physical_plan(
199                    limit,
200                    registry,
201                    runtime,
202                    extension_codec,
203                ),
204            PhysicalPlanType::LocalLimit(limit) => self
205                .try_into_local_limit_physical_plan(
206                    limit,
207                    registry,
208                    runtime,
209                    extension_codec,
210                ),
211            PhysicalPlanType::Window(window_agg) => self.try_into_window_physical_plan(
212                window_agg,
213                registry,
214                runtime,
215                extension_codec,
216            ),
217            PhysicalPlanType::Aggregate(hash_agg) => self
218                .try_into_aggregate_physical_plan(
219                    hash_agg,
220                    registry,
221                    runtime,
222                    extension_codec,
223                ),
224            PhysicalPlanType::HashJoin(hashjoin) => self
225                .try_into_hash_join_physical_plan(
226                    hashjoin,
227                    registry,
228                    runtime,
229                    extension_codec,
230                ),
231            PhysicalPlanType::SymmetricHashJoin(sym_join) => self
232                .try_into_symmetric_hash_join_physical_plan(
233                    sym_join,
234                    registry,
235                    runtime,
236                    extension_codec,
237                ),
238            PhysicalPlanType::Union(union) => self.try_into_union_physical_plan(
239                union,
240                registry,
241                runtime,
242                extension_codec,
243            ),
244            PhysicalPlanType::Interleave(interleave) => self
245                .try_into_interleave_physical_plan(
246                    interleave,
247                    registry,
248                    runtime,
249                    extension_codec,
250                ),
251            PhysicalPlanType::CrossJoin(crossjoin) => self
252                .try_into_cross_join_physical_plan(
253                    crossjoin,
254                    registry,
255                    runtime,
256                    extension_codec,
257                ),
258            PhysicalPlanType::Empty(empty) => self.try_into_empty_physical_plan(
259                empty,
260                registry,
261                runtime,
262                extension_codec,
263            ),
264            PhysicalPlanType::PlaceholderRow(placeholder) => self
265                .try_into_placeholder_row_physical_plan(
266                    placeholder,
267                    registry,
268                    runtime,
269                    extension_codec,
270                ),
271            PhysicalPlanType::Sort(sort) => {
272                self.try_into_sort_physical_plan(sort, registry, runtime, extension_codec)
273            }
274            PhysicalPlanType::SortPreservingMerge(sort) => self
275                .try_into_sort_preserving_merge_physical_plan(
276                    sort,
277                    registry,
278                    runtime,
279                    extension_codec,
280                ),
281            PhysicalPlanType::Extension(extension) => self
282                .try_into_extension_physical_plan(
283                    extension,
284                    registry,
285                    runtime,
286                    extension_codec,
287                ),
288            PhysicalPlanType::NestedLoopJoin(join) => self
289                .try_into_nested_loop_join_physical_plan(
290                    join,
291                    registry,
292                    runtime,
293                    extension_codec,
294                ),
295            PhysicalPlanType::Analyze(analyze) => self.try_into_analyze_physical_plan(
296                analyze,
297                registry,
298                runtime,
299                extension_codec,
300            ),
301            PhysicalPlanType::JsonSink(sink) => self.try_into_json_sink_physical_plan(
302                sink,
303                registry,
304                runtime,
305                extension_codec,
306            ),
307            PhysicalPlanType::CsvSink(sink) => self.try_into_csv_sink_physical_plan(
308                sink,
309                registry,
310                runtime,
311                extension_codec,
312            ),
313            #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
314            PhysicalPlanType::ParquetSink(sink) => self
315                .try_into_parquet_sink_physical_plan(
316                    sink,
317                    registry,
318                    runtime,
319                    extension_codec,
320                ),
321            PhysicalPlanType::Unnest(unnest) => self.try_into_unnest_physical_plan(
322                unnest,
323                registry,
324                runtime,
325                extension_codec,
326            ),
327            PhysicalPlanType::Cooperative(cooperative) => self
328                .try_into_cooperative_physical_plan(
329                    cooperative,
330                    registry,
331                    runtime,
332                    extension_codec,
333                ),
334        }
335    }
336
337    fn try_from_physical_plan(
338        plan: Arc<dyn ExecutionPlan>,
339        extension_codec: &dyn PhysicalExtensionCodec,
340    ) -> Result<Self>
341    where
342        Self: Sized,
343    {
344        let plan_clone = Arc::clone(&plan);
345        let plan = plan.as_any();
346
347        if let Some(exec) = plan.downcast_ref::<ExplainExec>() {
348            return protobuf::PhysicalPlanNode::try_from_explain_exec(
349                exec,
350                extension_codec,
351            );
352        }
353
354        if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
355            return protobuf::PhysicalPlanNode::try_from_projection_exec(
356                exec,
357                extension_codec,
358            );
359        }
360
361        if let Some(exec) = plan.downcast_ref::<AnalyzeExec>() {
362            return protobuf::PhysicalPlanNode::try_from_analyze_exec(
363                exec,
364                extension_codec,
365            );
366        }
367
368        if let Some(exec) = plan.downcast_ref::<FilterExec>() {
369            return protobuf::PhysicalPlanNode::try_from_filter_exec(
370                exec,
371                extension_codec,
372            );
373        }
374
375        if let Some(limit) = plan.downcast_ref::<GlobalLimitExec>() {
376            return protobuf::PhysicalPlanNode::try_from_global_limit_exec(
377                limit,
378                extension_codec,
379            );
380        }
381
382        if let Some(limit) = plan.downcast_ref::<LocalLimitExec>() {
383            return protobuf::PhysicalPlanNode::try_from_local_limit_exec(
384                limit,
385                extension_codec,
386            );
387        }
388
389        if let Some(exec) = plan.downcast_ref::<HashJoinExec>() {
390            return protobuf::PhysicalPlanNode::try_from_hash_join_exec(
391                exec,
392                extension_codec,
393            );
394        }
395
396        if let Some(exec) = plan.downcast_ref::<SymmetricHashJoinExec>() {
397            return protobuf::PhysicalPlanNode::try_from_symmetric_hash_join_exec(
398                exec,
399                extension_codec,
400            );
401        }
402
403        if let Some(exec) = plan.downcast_ref::<CrossJoinExec>() {
404            return protobuf::PhysicalPlanNode::try_from_cross_join_exec(
405                exec,
406                extension_codec,
407            );
408        }
409
410        if let Some(exec) = plan.downcast_ref::<AggregateExec>() {
411            return protobuf::PhysicalPlanNode::try_from_aggregate_exec(
412                exec,
413                extension_codec,
414            );
415        }
416
417        if let Some(empty) = plan.downcast_ref::<EmptyExec>() {
418            return protobuf::PhysicalPlanNode::try_from_empty_exec(
419                empty,
420                extension_codec,
421            );
422        }
423
424        if let Some(empty) = plan.downcast_ref::<PlaceholderRowExec>() {
425            return protobuf::PhysicalPlanNode::try_from_placeholder_row_exec(
426                empty,
427                extension_codec,
428            );
429        }
430
431        if let Some(coalesce_batches) = plan.downcast_ref::<CoalesceBatchesExec>() {
432            return protobuf::PhysicalPlanNode::try_from_coalesce_batches_exec(
433                coalesce_batches,
434                extension_codec,
435            );
436        }
437
438        if let Some(data_source_exec) = plan.downcast_ref::<DataSourceExec>() {
439            if let Some(node) = protobuf::PhysicalPlanNode::try_from_data_source_exec(
440                data_source_exec,
441                extension_codec,
442            )? {
443                return Ok(node);
444            }
445        }
446
447        if let Some(exec) = plan.downcast_ref::<CoalescePartitionsExec>() {
448            return protobuf::PhysicalPlanNode::try_from_coalesce_partitions_exec(
449                exec,
450                extension_codec,
451            );
452        }
453
454        if let Some(exec) = plan.downcast_ref::<RepartitionExec>() {
455            return protobuf::PhysicalPlanNode::try_from_repartition_exec(
456                exec,
457                extension_codec,
458            );
459        }
460
461        if let Some(exec) = plan.downcast_ref::<SortExec>() {
462            return protobuf::PhysicalPlanNode::try_from_sort_exec(exec, extension_codec);
463        }
464
465        if let Some(union) = plan.downcast_ref::<UnionExec>() {
466            return protobuf::PhysicalPlanNode::try_from_union_exec(
467                union,
468                extension_codec,
469            );
470        }
471
472        if let Some(interleave) = plan.downcast_ref::<InterleaveExec>() {
473            return protobuf::PhysicalPlanNode::try_from_interleave_exec(
474                interleave,
475                extension_codec,
476            );
477        }
478
479        if let Some(exec) = plan.downcast_ref::<SortPreservingMergeExec>() {
480            return protobuf::PhysicalPlanNode::try_from_sort_preserving_merge_exec(
481                exec,
482                extension_codec,
483            );
484        }
485
486        if let Some(exec) = plan.downcast_ref::<NestedLoopJoinExec>() {
487            return protobuf::PhysicalPlanNode::try_from_nested_loop_join_exec(
488                exec,
489                extension_codec,
490            );
491        }
492
493        if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
494            return protobuf::PhysicalPlanNode::try_from_window_agg_exec(
495                exec,
496                extension_codec,
497            );
498        }
499
500        if let Some(exec) = plan.downcast_ref::<BoundedWindowAggExec>() {
501            return protobuf::PhysicalPlanNode::try_from_bounded_window_agg_exec(
502                exec,
503                extension_codec,
504            );
505        }
506
507        if let Some(exec) = plan.downcast_ref::<DataSinkExec>() {
508            if let Some(node) = protobuf::PhysicalPlanNode::try_from_data_sink_exec(
509                exec,
510                extension_codec,
511            )? {
512                return Ok(node);
513            }
514        }
515
516        if let Some(exec) = plan.downcast_ref::<UnnestExec>() {
517            return protobuf::PhysicalPlanNode::try_from_unnest_exec(
518                exec,
519                extension_codec,
520            );
521        }
522
523        if let Some(exec) = plan.downcast_ref::<CooperativeExec>() {
524            return protobuf::PhysicalPlanNode::try_from_cooperative_exec(
525                exec,
526                extension_codec,
527            );
528        }
529
530        let mut buf: Vec<u8> = vec![];
531        match extension_codec.try_encode(Arc::clone(&plan_clone), &mut buf) {
532            Ok(_) => {
533                let inputs: Vec<protobuf::PhysicalPlanNode> = plan_clone
534                    .children()
535                    .into_iter()
536                    .cloned()
537                    .map(|i| {
538                        protobuf::PhysicalPlanNode::try_from_physical_plan(
539                            i,
540                            extension_codec,
541                        )
542                    })
543                    .collect::<Result<_>>()?;
544
545                Ok(protobuf::PhysicalPlanNode {
546                    physical_plan_type: Some(PhysicalPlanType::Extension(
547                        protobuf::PhysicalExtensionNode { node: buf, inputs },
548                    )),
549                })
550            }
551            Err(e) => internal_err!(
552                "Unsupported plan and extension codec failed with [{e}]. Plan: {plan_clone:?}"
553            ),
554        }
555    }
556}
557
558impl protobuf::PhysicalPlanNode {
559    fn try_into_explain_physical_plan(
560        &self,
561        explain: &protobuf::ExplainExecNode,
562        _registry: &dyn FunctionRegistry,
563        _runtime: &RuntimeEnv,
564        _extension_codec: &dyn PhysicalExtensionCodec,
565    ) -> Result<Arc<dyn ExecutionPlan>> {
566        Ok(Arc::new(ExplainExec::new(
567            Arc::new(explain.schema.as_ref().unwrap().try_into()?),
568            explain
569                .stringified_plans
570                .iter()
571                .map(|plan| plan.into())
572                .collect(),
573            explain.verbose,
574        )))
575    }
576
577    fn try_into_projection_physical_plan(
578        &self,
579        projection: &protobuf::ProjectionExecNode,
580        registry: &dyn FunctionRegistry,
581        runtime: &RuntimeEnv,
582        extension_codec: &dyn PhysicalExtensionCodec,
583    ) -> Result<Arc<dyn ExecutionPlan>> {
584        let input: Arc<dyn ExecutionPlan> =
585            into_physical_plan(&projection.input, registry, runtime, extension_codec)?;
586        let exprs = projection
587            .expr
588            .iter()
589            .zip(projection.expr_name.iter())
590            .map(|(expr, name)| {
591                Ok((
592                    parse_physical_expr(
593                        expr,
594                        registry,
595                        input.schema().as_ref(),
596                        extension_codec,
597                    )?,
598                    name.to_string(),
599                ))
600            })
601            .collect::<Result<Vec<(Arc<dyn PhysicalExpr>, String)>>>()?;
602        Ok(Arc::new(ProjectionExec::try_new(exprs, input)?))
603    }
604
605    fn try_into_filter_physical_plan(
606        &self,
607        filter: &protobuf::FilterExecNode,
608        registry: &dyn FunctionRegistry,
609        runtime: &RuntimeEnv,
610        extension_codec: &dyn PhysicalExtensionCodec,
611    ) -> Result<Arc<dyn ExecutionPlan>> {
612        let input: Arc<dyn ExecutionPlan> =
613            into_physical_plan(&filter.input, registry, runtime, extension_codec)?;
614        let projection = if !filter.projection.is_empty() {
615            Some(
616                filter
617                    .projection
618                    .iter()
619                    .map(|i| *i as usize)
620                    .collect::<Vec<_>>(),
621            )
622        } else {
623            None
624        };
625
626        // Use the projected schema if projection is present, otherwise use the full schema
627        let predicate_schema = if let Some(ref proj_indices) = projection {
628            // Create projected schema for parsing the predicate
629            let projected_fields: Vec<_> = proj_indices
630                .iter()
631                .map(|&i| input.schema().field(i).clone())
632                .collect();
633            Arc::new(Schema::new(projected_fields))
634        } else {
635            input.schema()
636        };
637
638        let predicate = filter
639            .expr
640            .as_ref()
641            .map(|expr| {
642                parse_physical_expr(
643                    expr,
644                    registry,
645                    predicate_schema.as_ref(),
646                    extension_codec,
647                )
648            })
649            .transpose()?
650            .ok_or_else(|| {
651                DataFusionError::Internal(
652                    "filter (FilterExecNode) in PhysicalPlanNode is missing.".to_owned(),
653                )
654            })?;
655        let filter_selectivity = filter.default_filter_selectivity.try_into();
656        let filter =
657            FilterExec::try_new(predicate, input)?.with_projection(projection)?;
658        match filter_selectivity {
659            Ok(filter_selectivity) => Ok(Arc::new(
660                filter.with_default_selectivity(filter_selectivity)?,
661            )),
662            Err(_) => Err(DataFusionError::Internal(
663                "filter_selectivity in PhysicalPlanNode is invalid ".to_owned(),
664            )),
665        }
666    }
667
668    fn try_into_csv_scan_physical_plan(
669        &self,
670        scan: &protobuf::CsvScanExecNode,
671        registry: &dyn FunctionRegistry,
672        _runtime: &RuntimeEnv,
673        extension_codec: &dyn PhysicalExtensionCodec,
674    ) -> Result<Arc<dyn ExecutionPlan>> {
675        let escape =
676            if let Some(protobuf::csv_scan_exec_node::OptionalEscape::Escape(escape)) =
677                &scan.optional_escape
678            {
679                Some(str_to_byte(escape, "escape")?)
680            } else {
681                None
682            };
683
684        let comment = if let Some(
685            protobuf::csv_scan_exec_node::OptionalComment::Comment(comment),
686        ) = &scan.optional_comment
687        {
688            Some(str_to_byte(comment, "comment")?)
689        } else {
690            None
691        };
692
693        let source = Arc::new(
694            CsvSource::new(
695                scan.has_header,
696                str_to_byte(&scan.delimiter, "delimiter")?,
697                0,
698            )
699            .with_escape(escape)
700            .with_comment(comment),
701        );
702
703        let conf = FileScanConfigBuilder::from(parse_protobuf_file_scan_config(
704            scan.base_conf.as_ref().unwrap(),
705            registry,
706            extension_codec,
707            source,
708        )?)
709        .with_newlines_in_values(scan.newlines_in_values)
710        .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
711        .build();
712        Ok(DataSourceExec::from_data_source(conf))
713    }
714
715    fn try_into_json_scan_physical_plan(
716        &self,
717        scan: &protobuf::JsonScanExecNode,
718        registry: &dyn FunctionRegistry,
719        _runtime: &RuntimeEnv,
720        extension_codec: &dyn PhysicalExtensionCodec,
721    ) -> Result<Arc<dyn ExecutionPlan>> {
722        let scan_conf = parse_protobuf_file_scan_config(
723            scan.base_conf.as_ref().unwrap(),
724            registry,
725            extension_codec,
726            Arc::new(JsonSource::new()),
727        )?;
728        Ok(DataSourceExec::from_data_source(scan_conf))
729    }
730
731    #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
732    fn try_into_parquet_scan_physical_plan(
733        &self,
734        scan: &protobuf::ParquetScanExecNode,
735        registry: &dyn FunctionRegistry,
736        _runtime: &RuntimeEnv,
737        extension_codec: &dyn PhysicalExtensionCodec,
738    ) -> Result<Arc<dyn ExecutionPlan>> {
739        #[cfg(feature = "parquet")]
740        {
741            let schema =
742                parse_protobuf_file_scan_schema(scan.base_conf.as_ref().unwrap())?;
743
744            // Check if there's a projection and use projected schema for predicate parsing
745            let base_conf = scan.base_conf.as_ref().unwrap();
746            let predicate_schema = if !base_conf.projection.is_empty() {
747                // Create projected schema for parsing the predicate
748                let projected_fields: Vec<_> = base_conf
749                    .projection
750                    .iter()
751                    .map(|&i| schema.field(i as usize).clone())
752                    .collect();
753                Arc::new(Schema::new(projected_fields))
754            } else {
755                schema
756            };
757
758            let predicate = scan
759                .predicate
760                .as_ref()
761                .map(|expr| {
762                    parse_physical_expr(
763                        expr,
764                        registry,
765                        predicate_schema.as_ref(),
766                        extension_codec,
767                    )
768                })
769                .transpose()?;
770            let mut options = TableParquetOptions::default();
771
772            if let Some(table_options) = scan.parquet_options.as_ref() {
773                options = table_options.try_into()?;
774            }
775            let mut source = ParquetSource::new(options);
776
777            if let Some(predicate) = predicate {
778                source = source.with_predicate(predicate);
779            }
780            let base_config = parse_protobuf_file_scan_config(
781                base_conf,
782                registry,
783                extension_codec,
784                Arc::new(source),
785            )?;
786            Ok(DataSourceExec::from_data_source(base_config))
787        }
788        #[cfg(not(feature = "parquet"))]
789        panic!("Unable to process a Parquet PhysicalPlan when `parquet` feature is not enabled")
790    }
791
792    #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
793    fn try_into_avro_scan_physical_plan(
794        &self,
795        scan: &protobuf::AvroScanExecNode,
796        registry: &dyn FunctionRegistry,
797        _runtime: &RuntimeEnv,
798        extension_codec: &dyn PhysicalExtensionCodec,
799    ) -> Result<Arc<dyn ExecutionPlan>> {
800        #[cfg(feature = "avro")]
801        {
802            let conf = parse_protobuf_file_scan_config(
803                scan.base_conf.as_ref().unwrap(),
804                registry,
805                extension_codec,
806                Arc::new(AvroSource::new()),
807            )?;
808            Ok(DataSourceExec::from_data_source(conf))
809        }
810        #[cfg(not(feature = "avro"))]
811        panic!("Unable to process a Avro PhysicalPlan when `avro` feature is not enabled")
812    }
813
814    fn try_into_coalesce_batches_physical_plan(
815        &self,
816        coalesce_batches: &protobuf::CoalesceBatchesExecNode,
817        registry: &dyn FunctionRegistry,
818        runtime: &RuntimeEnv,
819        extension_codec: &dyn PhysicalExtensionCodec,
820    ) -> Result<Arc<dyn ExecutionPlan>> {
821        let input: Arc<dyn ExecutionPlan> = into_physical_plan(
822            &coalesce_batches.input,
823            registry,
824            runtime,
825            extension_codec,
826        )?;
827        Ok(Arc::new(
828            CoalesceBatchesExec::new(input, coalesce_batches.target_batch_size as usize)
829                .with_fetch(coalesce_batches.fetch.map(|f| f as usize)),
830        ))
831    }
832
833    fn try_into_merge_physical_plan(
834        &self,
835        merge: &protobuf::CoalescePartitionsExecNode,
836        registry: &dyn FunctionRegistry,
837        runtime: &RuntimeEnv,
838        extension_codec: &dyn PhysicalExtensionCodec,
839    ) -> Result<Arc<dyn ExecutionPlan>> {
840        let input: Arc<dyn ExecutionPlan> =
841            into_physical_plan(&merge.input, registry, runtime, extension_codec)?;
842        Ok(Arc::new(
843            CoalescePartitionsExec::new(input)
844                .with_fetch(merge.fetch.map(|f| f as usize)),
845        ))
846    }
847
848    fn try_into_repartition_physical_plan(
849        &self,
850        repart: &protobuf::RepartitionExecNode,
851        registry: &dyn FunctionRegistry,
852        runtime: &RuntimeEnv,
853        extension_codec: &dyn PhysicalExtensionCodec,
854    ) -> Result<Arc<dyn ExecutionPlan>> {
855        let input: Arc<dyn ExecutionPlan> =
856            into_physical_plan(&repart.input, registry, runtime, extension_codec)?;
857        let partitioning = parse_protobuf_partitioning(
858            repart.partitioning.as_ref(),
859            registry,
860            input.schema().as_ref(),
861            extension_codec,
862        )?;
863        Ok(Arc::new(RepartitionExec::try_new(
864            input,
865            partitioning.unwrap(),
866        )?))
867    }
868
869    fn try_into_global_limit_physical_plan(
870        &self,
871        limit: &protobuf::GlobalLimitExecNode,
872        registry: &dyn FunctionRegistry,
873        runtime: &RuntimeEnv,
874        extension_codec: &dyn PhysicalExtensionCodec,
875    ) -> Result<Arc<dyn ExecutionPlan>> {
876        let input: Arc<dyn ExecutionPlan> =
877            into_physical_plan(&limit.input, registry, runtime, extension_codec)?;
878        let fetch = if limit.fetch >= 0 {
879            Some(limit.fetch as usize)
880        } else {
881            None
882        };
883        Ok(Arc::new(GlobalLimitExec::new(
884            input,
885            limit.skip as usize,
886            fetch,
887        )))
888    }
889
890    fn try_into_local_limit_physical_plan(
891        &self,
892        limit: &protobuf::LocalLimitExecNode,
893        registry: &dyn FunctionRegistry,
894        runtime: &RuntimeEnv,
895        extension_codec: &dyn PhysicalExtensionCodec,
896    ) -> Result<Arc<dyn ExecutionPlan>> {
897        let input: Arc<dyn ExecutionPlan> =
898            into_physical_plan(&limit.input, registry, runtime, extension_codec)?;
899        Ok(Arc::new(LocalLimitExec::new(input, limit.fetch as usize)))
900    }
901
902    fn try_into_window_physical_plan(
903        &self,
904        window_agg: &protobuf::WindowAggExecNode,
905        registry: &dyn FunctionRegistry,
906        runtime: &RuntimeEnv,
907        extension_codec: &dyn PhysicalExtensionCodec,
908    ) -> Result<Arc<dyn ExecutionPlan>> {
909        let input: Arc<dyn ExecutionPlan> =
910            into_physical_plan(&window_agg.input, registry, runtime, extension_codec)?;
911        let input_schema = input.schema();
912
913        let physical_window_expr: Vec<Arc<dyn WindowExpr>> = window_agg
914            .window_expr
915            .iter()
916            .map(|window_expr| {
917                parse_physical_window_expr(
918                    window_expr,
919                    registry,
920                    input_schema.as_ref(),
921                    extension_codec,
922                )
923            })
924            .collect::<Result<Vec<_>, _>>()?;
925
926        let partition_keys = window_agg
927            .partition_keys
928            .iter()
929            .map(|expr| {
930                parse_physical_expr(
931                    expr,
932                    registry,
933                    input.schema().as_ref(),
934                    extension_codec,
935                )
936            })
937            .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
938
939        if let Some(input_order_mode) = window_agg.input_order_mode.as_ref() {
940            let input_order_mode = match input_order_mode {
941                window_agg_exec_node::InputOrderMode::Linear(_) => InputOrderMode::Linear,
942                window_agg_exec_node::InputOrderMode::PartiallySorted(
943                    protobuf::PartiallySortedInputOrderMode { columns },
944                ) => InputOrderMode::PartiallySorted(
945                    columns.iter().map(|c| *c as usize).collect(),
946                ),
947                window_agg_exec_node::InputOrderMode::Sorted(_) => InputOrderMode::Sorted,
948            };
949
950            Ok(Arc::new(BoundedWindowAggExec::try_new(
951                physical_window_expr,
952                input,
953                input_order_mode,
954                !partition_keys.is_empty(),
955            )?))
956        } else {
957            Ok(Arc::new(WindowAggExec::try_new(
958                physical_window_expr,
959                input,
960                !partition_keys.is_empty(),
961            )?))
962        }
963    }
964
965    fn try_into_aggregate_physical_plan(
966        &self,
967        hash_agg: &protobuf::AggregateExecNode,
968        registry: &dyn FunctionRegistry,
969        runtime: &RuntimeEnv,
970        extension_codec: &dyn PhysicalExtensionCodec,
971    ) -> Result<Arc<dyn ExecutionPlan>> {
972        let input: Arc<dyn ExecutionPlan> =
973            into_physical_plan(&hash_agg.input, registry, runtime, extension_codec)?;
974        let mode = protobuf::AggregateMode::try_from(hash_agg.mode).map_err(|_| {
975            proto_error(format!(
976                "Received a AggregateNode message with unknown AggregateMode {}",
977                hash_agg.mode
978            ))
979        })?;
980        let agg_mode: AggregateMode = match mode {
981            protobuf::AggregateMode::Partial => AggregateMode::Partial,
982            protobuf::AggregateMode::Final => AggregateMode::Final,
983            protobuf::AggregateMode::FinalPartitioned => AggregateMode::FinalPartitioned,
984            protobuf::AggregateMode::Single => AggregateMode::Single,
985            protobuf::AggregateMode::SinglePartitioned => {
986                AggregateMode::SinglePartitioned
987            }
988        };
989
990        let num_expr = hash_agg.group_expr.len();
991
992        let group_expr = hash_agg
993            .group_expr
994            .iter()
995            .zip(hash_agg.group_expr_name.iter())
996            .map(|(expr, name)| {
997                parse_physical_expr(
998                    expr,
999                    registry,
1000                    input.schema().as_ref(),
1001                    extension_codec,
1002                )
1003                .map(|expr| (expr, name.to_string()))
1004            })
1005            .collect::<Result<Vec<_>, _>>()?;
1006
1007        let null_expr = hash_agg
1008            .null_expr
1009            .iter()
1010            .zip(hash_agg.group_expr_name.iter())
1011            .map(|(expr, name)| {
1012                parse_physical_expr(
1013                    expr,
1014                    registry,
1015                    input.schema().as_ref(),
1016                    extension_codec,
1017                )
1018                .map(|expr| (expr, name.to_string()))
1019            })
1020            .collect::<Result<Vec<_>, _>>()?;
1021
1022        let groups: Vec<Vec<bool>> = if !hash_agg.groups.is_empty() {
1023            hash_agg
1024                .groups
1025                .chunks(num_expr)
1026                .map(|g| g.to_vec())
1027                .collect::<Vec<Vec<bool>>>()
1028        } else {
1029            vec![]
1030        };
1031
1032        let input_schema = hash_agg.input_schema.as_ref().ok_or_else(|| {
1033            DataFusionError::Internal(
1034                "input_schema in AggregateNode is missing.".to_owned(),
1035            )
1036        })?;
1037        let physical_schema: SchemaRef = SchemaRef::new(input_schema.try_into()?);
1038
1039        let physical_filter_expr = hash_agg
1040            .filter_expr
1041            .iter()
1042            .map(|expr| {
1043                expr.expr
1044                    .as_ref()
1045                    .map(|e| {
1046                        parse_physical_expr(
1047                            e,
1048                            registry,
1049                            &physical_schema,
1050                            extension_codec,
1051                        )
1052                    })
1053                    .transpose()
1054            })
1055            .collect::<Result<Vec<_>, _>>()?;
1056
1057        let physical_aggr_expr: Vec<Arc<AggregateFunctionExpr>> = hash_agg
1058            .aggr_expr
1059            .iter()
1060            .zip(hash_agg.aggr_expr_name.iter())
1061            .map(|(expr, name)| {
1062                let expr_type = expr.expr_type.as_ref().ok_or_else(|| {
1063                    proto_error("Unexpected empty aggregate physical expression")
1064                })?;
1065
1066                match expr_type {
1067                    ExprType::AggregateExpr(agg_node) => {
1068                        let input_phy_expr: Vec<Arc<dyn PhysicalExpr>> = agg_node
1069                            .expr
1070                            .iter()
1071                            .map(|e| {
1072                                parse_physical_expr(
1073                                    e,
1074                                    registry,
1075                                    &physical_schema,
1076                                    extension_codec,
1077                                )
1078                            })
1079                            .collect::<Result<Vec<_>>>()?;
1080                        let order_bys = agg_node
1081                            .ordering_req
1082                            .iter()
1083                            .map(|e| {
1084                                parse_physical_sort_expr(
1085                                    e,
1086                                    registry,
1087                                    &physical_schema,
1088                                    extension_codec,
1089                                )
1090                            })
1091                            .collect::<Result<_>>()?;
1092                        agg_node
1093                            .aggregate_function
1094                            .as_ref()
1095                            .map(|func| match func {
1096                                AggregateFunction::UserDefinedAggrFunction(udaf_name) => {
1097                                    let agg_udf = match &agg_node.fun_definition {
1098                                        Some(buf) => extension_codec
1099                                            .try_decode_udaf(udaf_name, buf)?,
1100                                        None => {
1101                                            registry.udaf(udaf_name).or_else(|_| {
1102                                                extension_codec
1103                                                    .try_decode_udaf(udaf_name, &[])
1104                                            })?
1105                                        }
1106                                    };
1107
1108                                    AggregateExprBuilder::new(agg_udf, input_phy_expr)
1109                                        .schema(Arc::clone(&physical_schema))
1110                                        .alias(name)
1111                                        .with_ignore_nulls(agg_node.ignore_nulls)
1112                                        .with_distinct(agg_node.distinct)
1113                                        .order_by(order_bys)
1114                                        .build()
1115                                        .map(Arc::new)
1116                                }
1117                            })
1118                            .transpose()?
1119                            .ok_or_else(|| {
1120                                proto_error(
1121                                    "Invalid AggregateExpr, missing aggregate_function",
1122                                )
1123                            })
1124                    }
1125                    _ => internal_err!("Invalid aggregate expression for AggregateExec"),
1126                }
1127            })
1128            .collect::<Result<Vec<_>, _>>()?;
1129
1130        let limit = hash_agg
1131            .limit
1132            .as_ref()
1133            .map(|lit_value| lit_value.limit as usize);
1134
1135        let agg = AggregateExec::try_new(
1136            agg_mode,
1137            PhysicalGroupBy::new(group_expr, null_expr, groups),
1138            physical_aggr_expr,
1139            physical_filter_expr,
1140            input,
1141            physical_schema,
1142        )?;
1143
1144        let agg = agg.with_limit(limit);
1145
1146        Ok(Arc::new(agg))
1147    }
1148
1149    fn try_into_hash_join_physical_plan(
1150        &self,
1151        hashjoin: &protobuf::HashJoinExecNode,
1152        registry: &dyn FunctionRegistry,
1153        runtime: &RuntimeEnv,
1154        extension_codec: &dyn PhysicalExtensionCodec,
1155    ) -> Result<Arc<dyn ExecutionPlan>> {
1156        let left: Arc<dyn ExecutionPlan> =
1157            into_physical_plan(&hashjoin.left, registry, runtime, extension_codec)?;
1158        let right: Arc<dyn ExecutionPlan> =
1159            into_physical_plan(&hashjoin.right, registry, runtime, extension_codec)?;
1160        let left_schema = left.schema();
1161        let right_schema = right.schema();
1162        let on: Vec<(PhysicalExprRef, PhysicalExprRef)> = hashjoin
1163            .on
1164            .iter()
1165            .map(|col| {
1166                let left = parse_physical_expr(
1167                    &col.left.clone().unwrap(),
1168                    registry,
1169                    left_schema.as_ref(),
1170                    extension_codec,
1171                )?;
1172                let right = parse_physical_expr(
1173                    &col.right.clone().unwrap(),
1174                    registry,
1175                    right_schema.as_ref(),
1176                    extension_codec,
1177                )?;
1178                Ok((left, right))
1179            })
1180            .collect::<Result<_>>()?;
1181        let join_type =
1182            protobuf::JoinType::try_from(hashjoin.join_type).map_err(|_| {
1183                proto_error(format!(
1184                    "Received a HashJoinNode message with unknown JoinType {}",
1185                    hashjoin.join_type
1186                ))
1187            })?;
1188        let null_equality = protobuf::NullEquality::try_from(hashjoin.null_equality)
1189            .map_err(|_| {
1190                proto_error(format!(
1191                    "Received a HashJoinNode message with unknown NullEquality {}",
1192                    hashjoin.null_equality
1193                ))
1194            })?;
1195        let filter = hashjoin
1196            .filter
1197            .as_ref()
1198            .map(|f| {
1199                let schema = f
1200                    .schema
1201                    .as_ref()
1202                    .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1203                    .try_into()?;
1204
1205                let expression = parse_physical_expr(
1206                    f.expression.as_ref().ok_or_else(|| {
1207                        proto_error("Unexpected empty filter expression")
1208                    })?,
1209                    registry, &schema,
1210                    extension_codec,
1211                )?;
1212                let column_indices = f.column_indices
1213                    .iter()
1214                    .map(|i| {
1215                        let side = protobuf::JoinSide::try_from(i.side)
1216                            .map_err(|_| proto_error(format!(
1217                                "Received a HashJoinNode message with JoinSide in Filter {}",
1218                                i.side))
1219                            )?;
1220
1221                        Ok(ColumnIndex {
1222                            index: i.index as usize,
1223                            side: side.into(),
1224                        })
1225                    })
1226                    .collect::<Result<Vec<_>>>()?;
1227
1228                Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1229            })
1230            .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1231
1232        let partition_mode = protobuf::PartitionMode::try_from(hashjoin.partition_mode)
1233            .map_err(|_| {
1234            proto_error(format!(
1235                "Received a HashJoinNode message with unknown PartitionMode {}",
1236                hashjoin.partition_mode
1237            ))
1238        })?;
1239        let partition_mode = match partition_mode {
1240            protobuf::PartitionMode::CollectLeft => PartitionMode::CollectLeft,
1241            protobuf::PartitionMode::Partitioned => PartitionMode::Partitioned,
1242            protobuf::PartitionMode::Auto => PartitionMode::Auto,
1243        };
1244        let projection = if !hashjoin.projection.is_empty() {
1245            Some(
1246                hashjoin
1247                    .projection
1248                    .iter()
1249                    .map(|i| *i as usize)
1250                    .collect::<Vec<_>>(),
1251            )
1252        } else {
1253            None
1254        };
1255        Ok(Arc::new(HashJoinExec::try_new(
1256            left,
1257            right,
1258            on,
1259            filter,
1260            &join_type.into(),
1261            projection,
1262            partition_mode,
1263            null_equality.into(),
1264        )?))
1265    }
1266
1267    fn try_into_symmetric_hash_join_physical_plan(
1268        &self,
1269        sym_join: &protobuf::SymmetricHashJoinExecNode,
1270        registry: &dyn FunctionRegistry,
1271        runtime: &RuntimeEnv,
1272        extension_codec: &dyn PhysicalExtensionCodec,
1273    ) -> Result<Arc<dyn ExecutionPlan>> {
1274        let left =
1275            into_physical_plan(&sym_join.left, registry, runtime, extension_codec)?;
1276        let right =
1277            into_physical_plan(&sym_join.right, registry, runtime, extension_codec)?;
1278        let left_schema = left.schema();
1279        let right_schema = right.schema();
1280        let on = sym_join
1281            .on
1282            .iter()
1283            .map(|col| {
1284                let left = parse_physical_expr(
1285                    &col.left.clone().unwrap(),
1286                    registry,
1287                    left_schema.as_ref(),
1288                    extension_codec,
1289                )?;
1290                let right = parse_physical_expr(
1291                    &col.right.clone().unwrap(),
1292                    registry,
1293                    right_schema.as_ref(),
1294                    extension_codec,
1295                )?;
1296                Ok((left, right))
1297            })
1298            .collect::<Result<_>>()?;
1299        let join_type =
1300            protobuf::JoinType::try_from(sym_join.join_type).map_err(|_| {
1301                proto_error(format!(
1302                    "Received a SymmetricHashJoin message with unknown JoinType {}",
1303                    sym_join.join_type
1304                ))
1305            })?;
1306        let null_equality = protobuf::NullEquality::try_from(sym_join.null_equality)
1307            .map_err(|_| {
1308                proto_error(format!(
1309                    "Received a SymmetricHashJoin message with unknown NullEquality {}",
1310                    sym_join.null_equality
1311                ))
1312            })?;
1313        let filter = sym_join
1314            .filter
1315            .as_ref()
1316            .map(|f| {
1317                let schema = f
1318                    .schema
1319                    .as_ref()
1320                    .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1321                    .try_into()?;
1322
1323                let expression = parse_physical_expr(
1324                    f.expression.as_ref().ok_or_else(|| {
1325                        proto_error("Unexpected empty filter expression")
1326                    })?,
1327                    registry, &schema,
1328                    extension_codec,
1329                )?;
1330                let column_indices = f.column_indices
1331                    .iter()
1332                    .map(|i| {
1333                        let side = protobuf::JoinSide::try_from(i.side)
1334                            .map_err(|_| proto_error(format!(
1335                                "Received a HashJoinNode message with JoinSide in Filter {}",
1336                                i.side))
1337                            )?;
1338
1339                        Ok(ColumnIndex {
1340                            index: i.index as usize,
1341                            side: side.into(),
1342                        })
1343                    })
1344                    .collect::<Result<_>>()?;
1345
1346                Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1347            })
1348            .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1349
1350        let left_sort_exprs = parse_physical_sort_exprs(
1351            &sym_join.left_sort_exprs,
1352            registry,
1353            &left_schema,
1354            extension_codec,
1355        )?;
1356        let left_sort_exprs = LexOrdering::new(left_sort_exprs);
1357
1358        let right_sort_exprs = parse_physical_sort_exprs(
1359            &sym_join.right_sort_exprs,
1360            registry,
1361            &right_schema,
1362            extension_codec,
1363        )?;
1364        let right_sort_exprs = LexOrdering::new(right_sort_exprs);
1365
1366        let partition_mode = protobuf::StreamPartitionMode::try_from(
1367            sym_join.partition_mode,
1368        )
1369        .map_err(|_| {
1370            proto_error(format!(
1371                "Received a SymmetricHashJoin message with unknown PartitionMode {}",
1372                sym_join.partition_mode
1373            ))
1374        })?;
1375        let partition_mode = match partition_mode {
1376            protobuf::StreamPartitionMode::SinglePartition => {
1377                StreamJoinPartitionMode::SinglePartition
1378            }
1379            protobuf::StreamPartitionMode::PartitionedExec => {
1380                StreamJoinPartitionMode::Partitioned
1381            }
1382        };
1383        SymmetricHashJoinExec::try_new(
1384            left,
1385            right,
1386            on,
1387            filter,
1388            &join_type.into(),
1389            null_equality.into(),
1390            left_sort_exprs,
1391            right_sort_exprs,
1392            partition_mode,
1393        )
1394        .map(|e| Arc::new(e) as _)
1395    }
1396
1397    fn try_into_union_physical_plan(
1398        &self,
1399        union: &protobuf::UnionExecNode,
1400        registry: &dyn FunctionRegistry,
1401        runtime: &RuntimeEnv,
1402        extension_codec: &dyn PhysicalExtensionCodec,
1403    ) -> Result<Arc<dyn ExecutionPlan>> {
1404        let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1405        for input in &union.inputs {
1406            inputs.push(input.try_into_physical_plan(
1407                registry,
1408                runtime,
1409                extension_codec,
1410            )?);
1411        }
1412        Ok(Arc::new(UnionExec::new(inputs)))
1413    }
1414
1415    fn try_into_interleave_physical_plan(
1416        &self,
1417        interleave: &protobuf::InterleaveExecNode,
1418        registry: &dyn FunctionRegistry,
1419        runtime: &RuntimeEnv,
1420        extension_codec: &dyn PhysicalExtensionCodec,
1421    ) -> Result<Arc<dyn ExecutionPlan>> {
1422        let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1423        for input in &interleave.inputs {
1424            inputs.push(input.try_into_physical_plan(
1425                registry,
1426                runtime,
1427                extension_codec,
1428            )?);
1429        }
1430        Ok(Arc::new(InterleaveExec::try_new(inputs)?))
1431    }
1432
1433    fn try_into_cross_join_physical_plan(
1434        &self,
1435        crossjoin: &protobuf::CrossJoinExecNode,
1436        registry: &dyn FunctionRegistry,
1437        runtime: &RuntimeEnv,
1438        extension_codec: &dyn PhysicalExtensionCodec,
1439    ) -> Result<Arc<dyn ExecutionPlan>> {
1440        let left: Arc<dyn ExecutionPlan> =
1441            into_physical_plan(&crossjoin.left, registry, runtime, extension_codec)?;
1442        let right: Arc<dyn ExecutionPlan> =
1443            into_physical_plan(&crossjoin.right, registry, runtime, extension_codec)?;
1444        Ok(Arc::new(CrossJoinExec::new(left, right)))
1445    }
1446
1447    fn try_into_empty_physical_plan(
1448        &self,
1449        empty: &protobuf::EmptyExecNode,
1450        _registry: &dyn FunctionRegistry,
1451        _runtime: &RuntimeEnv,
1452        _extension_codec: &dyn PhysicalExtensionCodec,
1453    ) -> Result<Arc<dyn ExecutionPlan>> {
1454        let schema = Arc::new(convert_required!(empty.schema)?);
1455        Ok(Arc::new(EmptyExec::new(schema)))
1456    }
1457
1458    fn try_into_placeholder_row_physical_plan(
1459        &self,
1460        placeholder: &protobuf::PlaceholderRowExecNode,
1461        _registry: &dyn FunctionRegistry,
1462        _runtime: &RuntimeEnv,
1463        _extension_codec: &dyn PhysicalExtensionCodec,
1464    ) -> Result<Arc<dyn ExecutionPlan>> {
1465        let schema = Arc::new(convert_required!(placeholder.schema)?);
1466        Ok(Arc::new(PlaceholderRowExec::new(schema)))
1467    }
1468
1469    fn try_into_sort_physical_plan(
1470        &self,
1471        sort: &protobuf::SortExecNode,
1472        registry: &dyn FunctionRegistry,
1473        runtime: &RuntimeEnv,
1474        extension_codec: &dyn PhysicalExtensionCodec,
1475    ) -> Result<Arc<dyn ExecutionPlan>> {
1476        let input = into_physical_plan(&sort.input, registry, runtime, extension_codec)?;
1477        let exprs = sort
1478            .expr
1479            .iter()
1480            .map(|expr| {
1481                let expr = expr.expr_type.as_ref().ok_or_else(|| {
1482                    proto_error(format!(
1483                        "physical_plan::from_proto() Unexpected expr {self:?}"
1484                    ))
1485                })?;
1486                if let ExprType::Sort(sort_expr) = expr {
1487                    let expr = sort_expr
1488                        .expr
1489                        .as_ref()
1490                        .ok_or_else(|| {
1491                            proto_error(format!(
1492                                "physical_plan::from_proto() Unexpected sort expr {self:?}"
1493                            ))
1494                        })?
1495                        .as_ref();
1496                    Ok(PhysicalSortExpr {
1497                        expr: parse_physical_expr(expr, registry, input.schema().as_ref(), extension_codec)?,
1498                        options: SortOptions {
1499                            descending: !sort_expr.asc,
1500                            nulls_first: sort_expr.nulls_first,
1501                        },
1502                    })
1503                } else {
1504                    internal_err!(
1505                        "physical_plan::from_proto() {self:?}"
1506                    )
1507                }
1508            })
1509            .collect::<Result<Vec<_>>>()?;
1510        let Some(ordering) = LexOrdering::new(exprs) else {
1511            return internal_err!("SortExec requires an ordering");
1512        };
1513        let fetch = (sort.fetch >= 0).then_some(sort.fetch as _);
1514        let new_sort = SortExec::new(ordering, input)
1515            .with_fetch(fetch)
1516            .with_preserve_partitioning(sort.preserve_partitioning);
1517
1518        Ok(Arc::new(new_sort))
1519    }
1520
1521    fn try_into_sort_preserving_merge_physical_plan(
1522        &self,
1523        sort: &protobuf::SortPreservingMergeExecNode,
1524        registry: &dyn FunctionRegistry,
1525        runtime: &RuntimeEnv,
1526        extension_codec: &dyn PhysicalExtensionCodec,
1527    ) -> Result<Arc<dyn ExecutionPlan>> {
1528        let input = into_physical_plan(&sort.input, registry, runtime, extension_codec)?;
1529        let exprs = sort
1530            .expr
1531            .iter()
1532            .map(|expr| {
1533                let expr = expr.expr_type.as_ref().ok_or_else(|| {
1534                    proto_error(format!(
1535                        "physical_plan::from_proto() Unexpected expr {self:?}"
1536                    ))
1537                })?;
1538                if let ExprType::Sort(sort_expr) = expr {
1539                    let expr = sort_expr
1540                        .expr
1541                        .as_ref()
1542                        .ok_or_else(|| {
1543                            proto_error(format!(
1544                            "physical_plan::from_proto() Unexpected sort expr {self:?}"
1545                        ))
1546                        })?
1547                        .as_ref();
1548                    Ok(PhysicalSortExpr {
1549                        expr: parse_physical_expr(
1550                            expr,
1551                            registry,
1552                            input.schema().as_ref(),
1553                            extension_codec,
1554                        )?,
1555                        options: SortOptions {
1556                            descending: !sort_expr.asc,
1557                            nulls_first: sort_expr.nulls_first,
1558                        },
1559                    })
1560                } else {
1561                    internal_err!("physical_plan::from_proto() {self:?}")
1562                }
1563            })
1564            .collect::<Result<Vec<_>>>()?;
1565        let Some(ordering) = LexOrdering::new(exprs) else {
1566            return internal_err!("SortExec requires an ordering");
1567        };
1568        let fetch = (sort.fetch >= 0).then_some(sort.fetch as _);
1569        Ok(Arc::new(
1570            SortPreservingMergeExec::new(ordering, input).with_fetch(fetch),
1571        ))
1572    }
1573
1574    fn try_into_extension_physical_plan(
1575        &self,
1576        extension: &protobuf::PhysicalExtensionNode,
1577        registry: &dyn FunctionRegistry,
1578        runtime: &RuntimeEnv,
1579        extension_codec: &dyn PhysicalExtensionCodec,
1580    ) -> Result<Arc<dyn ExecutionPlan>> {
1581        let inputs: Vec<Arc<dyn ExecutionPlan>> = extension
1582            .inputs
1583            .iter()
1584            .map(|i| i.try_into_physical_plan(registry, runtime, extension_codec))
1585            .collect::<Result<_>>()?;
1586
1587        let extension_node =
1588            extension_codec.try_decode(extension.node.as_slice(), &inputs, registry)?;
1589
1590        Ok(extension_node)
1591    }
1592
1593    fn try_into_nested_loop_join_physical_plan(
1594        &self,
1595        join: &protobuf::NestedLoopJoinExecNode,
1596        registry: &dyn FunctionRegistry,
1597        runtime: &RuntimeEnv,
1598        extension_codec: &dyn PhysicalExtensionCodec,
1599    ) -> Result<Arc<dyn ExecutionPlan>> {
1600        let left: Arc<dyn ExecutionPlan> =
1601            into_physical_plan(&join.left, registry, runtime, extension_codec)?;
1602        let right: Arc<dyn ExecutionPlan> =
1603            into_physical_plan(&join.right, registry, runtime, extension_codec)?;
1604        let join_type = protobuf::JoinType::try_from(join.join_type).map_err(|_| {
1605            proto_error(format!(
1606                "Received a NestedLoopJoinExecNode message with unknown JoinType {}",
1607                join.join_type
1608            ))
1609        })?;
1610        let filter = join
1611                    .filter
1612                    .as_ref()
1613                    .map(|f| {
1614                        let schema = f
1615                            .schema
1616                            .as_ref()
1617                            .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1618                            .try_into()?;
1619
1620                        let expression = parse_physical_expr(
1621                            f.expression.as_ref().ok_or_else(|| {
1622                                proto_error("Unexpected empty filter expression")
1623                            })?,
1624                            registry, &schema,
1625                            extension_codec,
1626                        )?;
1627                        let column_indices = f.column_indices
1628                            .iter()
1629                            .map(|i| {
1630                                let side = protobuf::JoinSide::try_from(i.side)
1631                                    .map_err(|_| proto_error(format!(
1632                                        "Received a NestedLoopJoinExecNode message with JoinSide in Filter {}",
1633                                        i.side))
1634                                    )?;
1635
1636                                Ok(ColumnIndex {
1637                                    index: i.index as usize,
1638                                    side: side.into(),
1639                                })
1640                            })
1641                            .collect::<Result<Vec<_>>>()?;
1642
1643                        Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1644                    })
1645                    .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1646
1647        let projection = if !join.projection.is_empty() {
1648            Some(
1649                join.projection
1650                    .iter()
1651                    .map(|i| *i as usize)
1652                    .collect::<Vec<_>>(),
1653            )
1654        } else {
1655            None
1656        };
1657
1658        Ok(Arc::new(NestedLoopJoinExec::try_new(
1659            left,
1660            right,
1661            filter,
1662            &join_type.into(),
1663            projection,
1664        )?))
1665    }
1666
1667    fn try_into_analyze_physical_plan(
1668        &self,
1669        analyze: &protobuf::AnalyzeExecNode,
1670        registry: &dyn FunctionRegistry,
1671        runtime: &RuntimeEnv,
1672        extension_codec: &dyn PhysicalExtensionCodec,
1673    ) -> Result<Arc<dyn ExecutionPlan>> {
1674        let input: Arc<dyn ExecutionPlan> =
1675            into_physical_plan(&analyze.input, registry, runtime, extension_codec)?;
1676        Ok(Arc::new(AnalyzeExec::new(
1677            analyze.verbose,
1678            analyze.show_statistics,
1679            input,
1680            Arc::new(convert_required!(analyze.schema)?),
1681        )))
1682    }
1683
1684    fn try_into_json_sink_physical_plan(
1685        &self,
1686        sink: &protobuf::JsonSinkExecNode,
1687        registry: &dyn FunctionRegistry,
1688        runtime: &RuntimeEnv,
1689        extension_codec: &dyn PhysicalExtensionCodec,
1690    ) -> Result<Arc<dyn ExecutionPlan>> {
1691        let input = into_physical_plan(&sink.input, registry, runtime, extension_codec)?;
1692
1693        let data_sink: JsonSink = sink
1694            .sink
1695            .as_ref()
1696            .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1697            .try_into()?;
1698        let sink_schema = input.schema();
1699        let sort_order = sink
1700            .sort_order
1701            .as_ref()
1702            .map(|collection| {
1703                parse_physical_sort_exprs(
1704                    &collection.physical_sort_expr_nodes,
1705                    registry,
1706                    &sink_schema,
1707                    extension_codec,
1708                )
1709                .map(|sort_exprs| {
1710                    LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1711                })
1712            })
1713            .transpose()?
1714            .flatten();
1715        Ok(Arc::new(DataSinkExec::new(
1716            input,
1717            Arc::new(data_sink),
1718            sort_order,
1719        )))
1720    }
1721
1722    fn try_into_csv_sink_physical_plan(
1723        &self,
1724        sink: &protobuf::CsvSinkExecNode,
1725        registry: &dyn FunctionRegistry,
1726        runtime: &RuntimeEnv,
1727        extension_codec: &dyn PhysicalExtensionCodec,
1728    ) -> Result<Arc<dyn ExecutionPlan>> {
1729        let input = into_physical_plan(&sink.input, registry, runtime, extension_codec)?;
1730
1731        let data_sink: CsvSink = sink
1732            .sink
1733            .as_ref()
1734            .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1735            .try_into()?;
1736        let sink_schema = input.schema();
1737        let sort_order = sink
1738            .sort_order
1739            .as_ref()
1740            .map(|collection| {
1741                parse_physical_sort_exprs(
1742                    &collection.physical_sort_expr_nodes,
1743                    registry,
1744                    &sink_schema,
1745                    extension_codec,
1746                )
1747                .map(|sort_exprs| {
1748                    LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1749                })
1750            })
1751            .transpose()?
1752            .flatten();
1753        Ok(Arc::new(DataSinkExec::new(
1754            input,
1755            Arc::new(data_sink),
1756            sort_order,
1757        )))
1758    }
1759
1760    fn try_into_parquet_sink_physical_plan(
1761        &self,
1762        sink: &protobuf::ParquetSinkExecNode,
1763        registry: &dyn FunctionRegistry,
1764        runtime: &RuntimeEnv,
1765        extension_codec: &dyn PhysicalExtensionCodec,
1766    ) -> Result<Arc<dyn ExecutionPlan>> {
1767        #[cfg(feature = "parquet")]
1768        {
1769            let input =
1770                into_physical_plan(&sink.input, registry, runtime, extension_codec)?;
1771
1772            let data_sink: ParquetSink = sink
1773                .sink
1774                .as_ref()
1775                .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1776                .try_into()?;
1777            let sink_schema = input.schema();
1778            let sort_order = sink
1779                .sort_order
1780                .as_ref()
1781                .map(|collection| {
1782                    parse_physical_sort_exprs(
1783                        &collection.physical_sort_expr_nodes,
1784                        registry,
1785                        &sink_schema,
1786                        extension_codec,
1787                    )
1788                    .map(|sort_exprs| {
1789                        LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1790                    })
1791                })
1792                .transpose()?
1793                .flatten();
1794            Ok(Arc::new(DataSinkExec::new(
1795                input,
1796                Arc::new(data_sink),
1797                sort_order,
1798            )))
1799        }
1800        #[cfg(not(feature = "parquet"))]
1801        panic!("Trying to use ParquetSink without `parquet` feature enabled");
1802    }
1803
1804    fn try_into_unnest_physical_plan(
1805        &self,
1806        unnest: &protobuf::UnnestExecNode,
1807        registry: &dyn FunctionRegistry,
1808        runtime: &RuntimeEnv,
1809        extension_codec: &dyn PhysicalExtensionCodec,
1810    ) -> Result<Arc<dyn ExecutionPlan>> {
1811        let input =
1812            into_physical_plan(&unnest.input, registry, runtime, extension_codec)?;
1813
1814        Ok(Arc::new(UnnestExec::new(
1815            input,
1816            unnest
1817                .list_type_columns
1818                .iter()
1819                .map(|c| ListUnnest {
1820                    index_in_input_schema: c.index_in_input_schema as _,
1821                    depth: c.depth as _,
1822                })
1823                .collect(),
1824            unnest.struct_type_columns.iter().map(|c| *c as _).collect(),
1825            Arc::new(convert_required!(unnest.schema)?),
1826            into_required!(unnest.options)?,
1827        )))
1828    }
1829
1830    fn try_into_cooperative_physical_plan(
1831        &self,
1832        field_stream: &protobuf::CooperativeExecNode,
1833        registry: &dyn FunctionRegistry,
1834        runtime: &RuntimeEnv,
1835        extension_codec: &dyn PhysicalExtensionCodec,
1836    ) -> Result<Arc<dyn ExecutionPlan>> {
1837        let input =
1838            into_physical_plan(&field_stream.input, registry, runtime, extension_codec)?;
1839        Ok(Arc::new(CooperativeExec::new(input)))
1840    }
1841
1842    fn try_from_explain_exec(
1843        exec: &ExplainExec,
1844        _extension_codec: &dyn PhysicalExtensionCodec,
1845    ) -> Result<Self> {
1846        Ok(protobuf::PhysicalPlanNode {
1847            physical_plan_type: Some(PhysicalPlanType::Explain(
1848                protobuf::ExplainExecNode {
1849                    schema: Some(exec.schema().as_ref().try_into()?),
1850                    stringified_plans: exec
1851                        .stringified_plans()
1852                        .iter()
1853                        .map(|plan| plan.into())
1854                        .collect(),
1855                    verbose: exec.verbose(),
1856                },
1857            )),
1858        })
1859    }
1860
1861    fn try_from_projection_exec(
1862        exec: &ProjectionExec,
1863        extension_codec: &dyn PhysicalExtensionCodec,
1864    ) -> Result<Self> {
1865        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1866            exec.input().to_owned(),
1867            extension_codec,
1868        )?;
1869        let expr = exec
1870            .expr()
1871            .iter()
1872            .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
1873            .collect::<Result<Vec<_>>>()?;
1874        let expr_name = exec.expr().iter().map(|expr| expr.1.clone()).collect();
1875        Ok(protobuf::PhysicalPlanNode {
1876            physical_plan_type: Some(PhysicalPlanType::Projection(Box::new(
1877                protobuf::ProjectionExecNode {
1878                    input: Some(Box::new(input)),
1879                    expr,
1880                    expr_name,
1881                },
1882            ))),
1883        })
1884    }
1885
1886    fn try_from_analyze_exec(
1887        exec: &AnalyzeExec,
1888        extension_codec: &dyn PhysicalExtensionCodec,
1889    ) -> Result<Self> {
1890        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1891            exec.input().to_owned(),
1892            extension_codec,
1893        )?;
1894        Ok(protobuf::PhysicalPlanNode {
1895            physical_plan_type: Some(PhysicalPlanType::Analyze(Box::new(
1896                protobuf::AnalyzeExecNode {
1897                    verbose: exec.verbose(),
1898                    show_statistics: exec.show_statistics(),
1899                    input: Some(Box::new(input)),
1900                    schema: Some(exec.schema().as_ref().try_into()?),
1901                },
1902            ))),
1903        })
1904    }
1905
1906    fn try_from_filter_exec(
1907        exec: &FilterExec,
1908        extension_codec: &dyn PhysicalExtensionCodec,
1909    ) -> Result<Self> {
1910        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1911            exec.input().to_owned(),
1912            extension_codec,
1913        )?;
1914        Ok(protobuf::PhysicalPlanNode {
1915            physical_plan_type: Some(PhysicalPlanType::Filter(Box::new(
1916                protobuf::FilterExecNode {
1917                    input: Some(Box::new(input)),
1918                    expr: Some(serialize_physical_expr(
1919                        exec.predicate(),
1920                        extension_codec,
1921                    )?),
1922                    default_filter_selectivity: exec.default_selectivity() as u32,
1923                    projection: exec.projection().as_ref().map_or_else(Vec::new, |v| {
1924                        v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
1925                    }),
1926                },
1927            ))),
1928        })
1929    }
1930
1931    fn try_from_global_limit_exec(
1932        limit: &GlobalLimitExec,
1933        extension_codec: &dyn PhysicalExtensionCodec,
1934    ) -> Result<Self> {
1935        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1936            limit.input().to_owned(),
1937            extension_codec,
1938        )?;
1939
1940        Ok(protobuf::PhysicalPlanNode {
1941            physical_plan_type: Some(PhysicalPlanType::GlobalLimit(Box::new(
1942                protobuf::GlobalLimitExecNode {
1943                    input: Some(Box::new(input)),
1944                    skip: limit.skip() as u32,
1945                    fetch: match limit.fetch() {
1946                        Some(n) => n as i64,
1947                        _ => -1, // no limit
1948                    },
1949                },
1950            ))),
1951        })
1952    }
1953
1954    fn try_from_local_limit_exec(
1955        limit: &LocalLimitExec,
1956        extension_codec: &dyn PhysicalExtensionCodec,
1957    ) -> Result<Self> {
1958        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1959            limit.input().to_owned(),
1960            extension_codec,
1961        )?;
1962        Ok(protobuf::PhysicalPlanNode {
1963            physical_plan_type: Some(PhysicalPlanType::LocalLimit(Box::new(
1964                protobuf::LocalLimitExecNode {
1965                    input: Some(Box::new(input)),
1966                    fetch: limit.fetch() as u32,
1967                },
1968            ))),
1969        })
1970    }
1971
1972    fn try_from_hash_join_exec(
1973        exec: &HashJoinExec,
1974        extension_codec: &dyn PhysicalExtensionCodec,
1975    ) -> Result<Self> {
1976        let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
1977            exec.left().to_owned(),
1978            extension_codec,
1979        )?;
1980        let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
1981            exec.right().to_owned(),
1982            extension_codec,
1983        )?;
1984        let on: Vec<protobuf::JoinOn> = exec
1985            .on()
1986            .iter()
1987            .map(|tuple| {
1988                let l = serialize_physical_expr(&tuple.0, extension_codec)?;
1989                let r = serialize_physical_expr(&tuple.1, extension_codec)?;
1990                Ok::<_, DataFusionError>(protobuf::JoinOn {
1991                    left: Some(l),
1992                    right: Some(r),
1993                })
1994            })
1995            .collect::<Result<_>>()?;
1996        let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
1997        let null_equality: protobuf::NullEquality = exec.null_equality().into();
1998        let filter = exec
1999            .filter()
2000            .as_ref()
2001            .map(|f| {
2002                let expression =
2003                    serialize_physical_expr(f.expression(), extension_codec)?;
2004                let column_indices = f
2005                    .column_indices()
2006                    .iter()
2007                    .map(|i| {
2008                        let side: protobuf::JoinSide = i.side.to_owned().into();
2009                        protobuf::ColumnIndex {
2010                            index: i.index as u32,
2011                            side: side.into(),
2012                        }
2013                    })
2014                    .collect();
2015                let schema = f.schema().as_ref().try_into()?;
2016                Ok(protobuf::JoinFilter {
2017                    expression: Some(expression),
2018                    column_indices,
2019                    schema: Some(schema),
2020                })
2021            })
2022            .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2023
2024        let partition_mode = match exec.partition_mode() {
2025            PartitionMode::CollectLeft => protobuf::PartitionMode::CollectLeft,
2026            PartitionMode::Partitioned => protobuf::PartitionMode::Partitioned,
2027            PartitionMode::Auto => protobuf::PartitionMode::Auto,
2028        };
2029
2030        Ok(protobuf::PhysicalPlanNode {
2031            physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new(
2032                protobuf::HashJoinExecNode {
2033                    left: Some(Box::new(left)),
2034                    right: Some(Box::new(right)),
2035                    on,
2036                    join_type: join_type.into(),
2037                    partition_mode: partition_mode.into(),
2038                    null_equality: null_equality.into(),
2039                    filter,
2040                    projection: exec.projection.as_ref().map_or_else(Vec::new, |v| {
2041                        v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2042                    }),
2043                },
2044            ))),
2045        })
2046    }
2047
2048    fn try_from_symmetric_hash_join_exec(
2049        exec: &SymmetricHashJoinExec,
2050        extension_codec: &dyn PhysicalExtensionCodec,
2051    ) -> Result<Self> {
2052        let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2053            exec.left().to_owned(),
2054            extension_codec,
2055        )?;
2056        let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2057            exec.right().to_owned(),
2058            extension_codec,
2059        )?;
2060        let on = exec
2061            .on()
2062            .iter()
2063            .map(|tuple| {
2064                let l = serialize_physical_expr(&tuple.0, extension_codec)?;
2065                let r = serialize_physical_expr(&tuple.1, extension_codec)?;
2066                Ok::<_, DataFusionError>(protobuf::JoinOn {
2067                    left: Some(l),
2068                    right: Some(r),
2069                })
2070            })
2071            .collect::<Result<_>>()?;
2072        let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2073        let null_equality: protobuf::NullEquality = exec.null_equality().into();
2074        let filter = exec
2075            .filter()
2076            .as_ref()
2077            .map(|f| {
2078                let expression =
2079                    serialize_physical_expr(f.expression(), extension_codec)?;
2080                let column_indices = f
2081                    .column_indices()
2082                    .iter()
2083                    .map(|i| {
2084                        let side: protobuf::JoinSide = i.side.to_owned().into();
2085                        protobuf::ColumnIndex {
2086                            index: i.index as u32,
2087                            side: side.into(),
2088                        }
2089                    })
2090                    .collect();
2091                let schema = f.schema().as_ref().try_into()?;
2092                Ok(protobuf::JoinFilter {
2093                    expression: Some(expression),
2094                    column_indices,
2095                    schema: Some(schema),
2096                })
2097            })
2098            .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2099
2100        let partition_mode = match exec.partition_mode() {
2101            StreamJoinPartitionMode::SinglePartition => {
2102                protobuf::StreamPartitionMode::SinglePartition
2103            }
2104            StreamJoinPartitionMode::Partitioned => {
2105                protobuf::StreamPartitionMode::PartitionedExec
2106            }
2107        };
2108
2109        let left_sort_exprs = exec
2110            .left_sort_exprs()
2111            .map(|exprs| {
2112                exprs
2113                    .iter()
2114                    .map(|expr| {
2115                        Ok(protobuf::PhysicalSortExprNode {
2116                            expr: Some(Box::new(serialize_physical_expr(
2117                                &expr.expr,
2118                                extension_codec,
2119                            )?)),
2120                            asc: !expr.options.descending,
2121                            nulls_first: expr.options.nulls_first,
2122                        })
2123                    })
2124                    .collect::<Result<Vec<_>>>()
2125            })
2126            .transpose()?
2127            .unwrap_or(vec![]);
2128
2129        let right_sort_exprs = exec
2130            .right_sort_exprs()
2131            .map(|exprs| {
2132                exprs
2133                    .iter()
2134                    .map(|expr| {
2135                        Ok(protobuf::PhysicalSortExprNode {
2136                            expr: Some(Box::new(serialize_physical_expr(
2137                                &expr.expr,
2138                                extension_codec,
2139                            )?)),
2140                            asc: !expr.options.descending,
2141                            nulls_first: expr.options.nulls_first,
2142                        })
2143                    })
2144                    .collect::<Result<Vec<_>>>()
2145            })
2146            .transpose()?
2147            .unwrap_or(vec![]);
2148
2149        Ok(protobuf::PhysicalPlanNode {
2150            physical_plan_type: Some(PhysicalPlanType::SymmetricHashJoin(Box::new(
2151                protobuf::SymmetricHashJoinExecNode {
2152                    left: Some(Box::new(left)),
2153                    right: Some(Box::new(right)),
2154                    on,
2155                    join_type: join_type.into(),
2156                    partition_mode: partition_mode.into(),
2157                    null_equality: null_equality.into(),
2158                    left_sort_exprs,
2159                    right_sort_exprs,
2160                    filter,
2161                },
2162            ))),
2163        })
2164    }
2165
2166    fn try_from_cross_join_exec(
2167        exec: &CrossJoinExec,
2168        extension_codec: &dyn PhysicalExtensionCodec,
2169    ) -> Result<Self> {
2170        let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2171            exec.left().to_owned(),
2172            extension_codec,
2173        )?;
2174        let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2175            exec.right().to_owned(),
2176            extension_codec,
2177        )?;
2178        Ok(protobuf::PhysicalPlanNode {
2179            physical_plan_type: Some(PhysicalPlanType::CrossJoin(Box::new(
2180                protobuf::CrossJoinExecNode {
2181                    left: Some(Box::new(left)),
2182                    right: Some(Box::new(right)),
2183                },
2184            ))),
2185        })
2186    }
2187
2188    fn try_from_aggregate_exec(
2189        exec: &AggregateExec,
2190        extension_codec: &dyn PhysicalExtensionCodec,
2191    ) -> Result<Self> {
2192        let groups: Vec<bool> = exec
2193            .group_expr()
2194            .groups()
2195            .iter()
2196            .flatten()
2197            .copied()
2198            .collect();
2199
2200        let group_names = exec
2201            .group_expr()
2202            .expr()
2203            .iter()
2204            .map(|expr| expr.1.to_owned())
2205            .collect();
2206
2207        let filter = exec
2208            .filter_expr()
2209            .iter()
2210            .map(|expr| serialize_maybe_filter(expr.to_owned(), extension_codec))
2211            .collect::<Result<Vec<_>>>()?;
2212
2213        let agg = exec
2214            .aggr_expr()
2215            .iter()
2216            .map(|expr| serialize_physical_aggr_expr(expr.to_owned(), extension_codec))
2217            .collect::<Result<Vec<_>>>()?;
2218
2219        let agg_names = exec
2220            .aggr_expr()
2221            .iter()
2222            .map(|expr| expr.name().to_string())
2223            .collect::<Vec<_>>();
2224
2225        let agg_mode = match exec.mode() {
2226            AggregateMode::Partial => protobuf::AggregateMode::Partial,
2227            AggregateMode::Final => protobuf::AggregateMode::Final,
2228            AggregateMode::FinalPartitioned => protobuf::AggregateMode::FinalPartitioned,
2229            AggregateMode::Single => protobuf::AggregateMode::Single,
2230            AggregateMode::SinglePartitioned => {
2231                protobuf::AggregateMode::SinglePartitioned
2232            }
2233        };
2234        let input_schema = exec.input_schema();
2235        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2236            exec.input().to_owned(),
2237            extension_codec,
2238        )?;
2239
2240        let null_expr = exec
2241            .group_expr()
2242            .null_expr()
2243            .iter()
2244            .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
2245            .collect::<Result<Vec<_>>>()?;
2246
2247        let group_expr = exec
2248            .group_expr()
2249            .expr()
2250            .iter()
2251            .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
2252            .collect::<Result<Vec<_>>>()?;
2253
2254        let limit = exec.limit().map(|value| protobuf::AggLimit {
2255            limit: value as u64,
2256        });
2257
2258        Ok(protobuf::PhysicalPlanNode {
2259            physical_plan_type: Some(PhysicalPlanType::Aggregate(Box::new(
2260                protobuf::AggregateExecNode {
2261                    group_expr,
2262                    group_expr_name: group_names,
2263                    aggr_expr: agg,
2264                    filter_expr: filter,
2265                    aggr_expr_name: agg_names,
2266                    mode: agg_mode as i32,
2267                    input: Some(Box::new(input)),
2268                    input_schema: Some(input_schema.as_ref().try_into()?),
2269                    null_expr,
2270                    groups,
2271                    limit,
2272                },
2273            ))),
2274        })
2275    }
2276
2277    fn try_from_empty_exec(
2278        empty: &EmptyExec,
2279        _extension_codec: &dyn PhysicalExtensionCodec,
2280    ) -> Result<Self> {
2281        let schema = empty.schema().as_ref().try_into()?;
2282        Ok(protobuf::PhysicalPlanNode {
2283            physical_plan_type: Some(PhysicalPlanType::Empty(protobuf::EmptyExecNode {
2284                schema: Some(schema),
2285            })),
2286        })
2287    }
2288
2289    fn try_from_placeholder_row_exec(
2290        empty: &PlaceholderRowExec,
2291        _extension_codec: &dyn PhysicalExtensionCodec,
2292    ) -> Result<Self> {
2293        let schema = empty.schema().as_ref().try_into()?;
2294        Ok(protobuf::PhysicalPlanNode {
2295            physical_plan_type: Some(PhysicalPlanType::PlaceholderRow(
2296                protobuf::PlaceholderRowExecNode {
2297                    schema: Some(schema),
2298                },
2299            )),
2300        })
2301    }
2302
2303    fn try_from_coalesce_batches_exec(
2304        coalesce_batches: &CoalesceBatchesExec,
2305        extension_codec: &dyn PhysicalExtensionCodec,
2306    ) -> Result<Self> {
2307        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2308            coalesce_batches.input().to_owned(),
2309            extension_codec,
2310        )?;
2311        Ok(protobuf::PhysicalPlanNode {
2312            physical_plan_type: Some(PhysicalPlanType::CoalesceBatches(Box::new(
2313                protobuf::CoalesceBatchesExecNode {
2314                    input: Some(Box::new(input)),
2315                    target_batch_size: coalesce_batches.target_batch_size() as u32,
2316                    fetch: coalesce_batches.fetch().map(|n| n as u32),
2317                },
2318            ))),
2319        })
2320    }
2321
2322    fn try_from_data_source_exec(
2323        data_source_exec: &DataSourceExec,
2324        extension_codec: &dyn PhysicalExtensionCodec,
2325    ) -> Result<Option<Self>> {
2326        let data_source = data_source_exec.data_source();
2327        if let Some(maybe_csv) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2328            let source = maybe_csv.file_source();
2329            if let Some(csv_config) = source.as_any().downcast_ref::<CsvSource>() {
2330                return Ok(Some(protobuf::PhysicalPlanNode {
2331                    physical_plan_type: Some(PhysicalPlanType::CsvScan(
2332                        protobuf::CsvScanExecNode {
2333                            base_conf: Some(serialize_file_scan_config(
2334                                maybe_csv,
2335                                extension_codec,
2336                            )?),
2337                            has_header: csv_config.has_header(),
2338                            delimiter: byte_to_string(
2339                                csv_config.delimiter(),
2340                                "delimiter",
2341                            )?,
2342                            quote: byte_to_string(csv_config.quote(), "quote")?,
2343                            optional_escape: if let Some(escape) = csv_config.escape() {
2344                                Some(
2345                                    protobuf::csv_scan_exec_node::OptionalEscape::Escape(
2346                                        byte_to_string(escape, "escape")?,
2347                                    ),
2348                                )
2349                            } else {
2350                                None
2351                            },
2352                            optional_comment: if let Some(comment) = csv_config.comment()
2353                            {
2354                                Some(protobuf::csv_scan_exec_node::OptionalComment::Comment(
2355                                        byte_to_string(comment, "comment")?,
2356                                    ))
2357                            } else {
2358                                None
2359                            },
2360                            newlines_in_values: maybe_csv.newlines_in_values(),
2361                        },
2362                    )),
2363                }));
2364            }
2365        }
2366
2367        if let Some(scan_conf) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2368            let source = scan_conf.file_source();
2369            if let Some(_json_source) = source.as_any().downcast_ref::<JsonSource>() {
2370                return Ok(Some(protobuf::PhysicalPlanNode {
2371                    physical_plan_type: Some(PhysicalPlanType::JsonScan(
2372                        protobuf::JsonScanExecNode {
2373                            base_conf: Some(serialize_file_scan_config(
2374                                scan_conf,
2375                                extension_codec,
2376                            )?),
2377                        },
2378                    )),
2379                }));
2380            }
2381        }
2382
2383        #[cfg(feature = "parquet")]
2384        if let Some((maybe_parquet, conf)) =
2385            data_source_exec.downcast_to_file_source::<ParquetSource>()
2386        {
2387            let predicate = conf
2388                .predicate()
2389                .map(|pred| serialize_physical_expr(pred, extension_codec))
2390                .transpose()?;
2391            return Ok(Some(protobuf::PhysicalPlanNode {
2392                physical_plan_type: Some(PhysicalPlanType::ParquetScan(
2393                    protobuf::ParquetScanExecNode {
2394                        base_conf: Some(serialize_file_scan_config(
2395                            maybe_parquet,
2396                            extension_codec,
2397                        )?),
2398                        predicate,
2399                        parquet_options: Some(conf.table_parquet_options().try_into()?),
2400                    },
2401                )),
2402            }));
2403        }
2404
2405        #[cfg(feature = "avro")]
2406        if let Some(maybe_avro) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2407            let source = maybe_avro.file_source();
2408            if source.as_any().downcast_ref::<AvroSource>().is_some() {
2409                return Ok(Some(protobuf::PhysicalPlanNode {
2410                    physical_plan_type: Some(PhysicalPlanType::AvroScan(
2411                        protobuf::AvroScanExecNode {
2412                            base_conf: Some(serialize_file_scan_config(
2413                                maybe_avro,
2414                                extension_codec,
2415                            )?),
2416                        },
2417                    )),
2418                }));
2419            }
2420        }
2421
2422        Ok(None)
2423    }
2424
2425    fn try_from_coalesce_partitions_exec(
2426        exec: &CoalescePartitionsExec,
2427        extension_codec: &dyn PhysicalExtensionCodec,
2428    ) -> Result<Self> {
2429        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2430            exec.input().to_owned(),
2431            extension_codec,
2432        )?;
2433        Ok(protobuf::PhysicalPlanNode {
2434            physical_plan_type: Some(PhysicalPlanType::Merge(Box::new(
2435                protobuf::CoalescePartitionsExecNode {
2436                    input: Some(Box::new(input)),
2437                    fetch: exec.fetch().map(|f| f as u32),
2438                },
2439            ))),
2440        })
2441    }
2442
2443    fn try_from_repartition_exec(
2444        exec: &RepartitionExec,
2445        extension_codec: &dyn PhysicalExtensionCodec,
2446    ) -> Result<Self> {
2447        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2448            exec.input().to_owned(),
2449            extension_codec,
2450        )?;
2451
2452        let pb_partitioning =
2453            serialize_partitioning(exec.partitioning(), extension_codec)?;
2454
2455        Ok(protobuf::PhysicalPlanNode {
2456            physical_plan_type: Some(PhysicalPlanType::Repartition(Box::new(
2457                protobuf::RepartitionExecNode {
2458                    input: Some(Box::new(input)),
2459                    partitioning: Some(pb_partitioning),
2460                },
2461            ))),
2462        })
2463    }
2464
2465    fn try_from_sort_exec(
2466        exec: &SortExec,
2467        extension_codec: &dyn PhysicalExtensionCodec,
2468    ) -> Result<Self> {
2469        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2470            exec.input().to_owned(),
2471            extension_codec,
2472        )?;
2473        let expr = exec
2474            .expr()
2475            .iter()
2476            .map(|expr| {
2477                let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
2478                    expr: Some(Box::new(serialize_physical_expr(
2479                        &expr.expr,
2480                        extension_codec,
2481                    )?)),
2482                    asc: !expr.options.descending,
2483                    nulls_first: expr.options.nulls_first,
2484                });
2485                Ok(protobuf::PhysicalExprNode {
2486                    expr_type: Some(ExprType::Sort(sort_expr)),
2487                })
2488            })
2489            .collect::<Result<Vec<_>>>()?;
2490        Ok(protobuf::PhysicalPlanNode {
2491            physical_plan_type: Some(PhysicalPlanType::Sort(Box::new(
2492                protobuf::SortExecNode {
2493                    input: Some(Box::new(input)),
2494                    expr,
2495                    fetch: match exec.fetch() {
2496                        Some(n) => n as i64,
2497                        _ => -1,
2498                    },
2499                    preserve_partitioning: exec.preserve_partitioning(),
2500                },
2501            ))),
2502        })
2503    }
2504
2505    fn try_from_union_exec(
2506        union: &UnionExec,
2507        extension_codec: &dyn PhysicalExtensionCodec,
2508    ) -> Result<Self> {
2509        let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
2510        for input in union.inputs() {
2511            inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
2512                input.to_owned(),
2513                extension_codec,
2514            )?);
2515        }
2516        Ok(protobuf::PhysicalPlanNode {
2517            physical_plan_type: Some(PhysicalPlanType::Union(protobuf::UnionExecNode {
2518                inputs,
2519            })),
2520        })
2521    }
2522
2523    fn try_from_interleave_exec(
2524        interleave: &InterleaveExec,
2525        extension_codec: &dyn PhysicalExtensionCodec,
2526    ) -> Result<Self> {
2527        let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
2528        for input in interleave.inputs() {
2529            inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
2530                input.to_owned(),
2531                extension_codec,
2532            )?);
2533        }
2534        Ok(protobuf::PhysicalPlanNode {
2535            physical_plan_type: Some(PhysicalPlanType::Interleave(
2536                protobuf::InterleaveExecNode { inputs },
2537            )),
2538        })
2539    }
2540
2541    fn try_from_sort_preserving_merge_exec(
2542        exec: &SortPreservingMergeExec,
2543        extension_codec: &dyn PhysicalExtensionCodec,
2544    ) -> Result<Self> {
2545        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2546            exec.input().to_owned(),
2547            extension_codec,
2548        )?;
2549        let expr = exec
2550            .expr()
2551            .iter()
2552            .map(|expr| {
2553                let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
2554                    expr: Some(Box::new(serialize_physical_expr(
2555                        &expr.expr,
2556                        extension_codec,
2557                    )?)),
2558                    asc: !expr.options.descending,
2559                    nulls_first: expr.options.nulls_first,
2560                });
2561                Ok(protobuf::PhysicalExprNode {
2562                    expr_type: Some(ExprType::Sort(sort_expr)),
2563                })
2564            })
2565            .collect::<Result<Vec<_>>>()?;
2566        Ok(protobuf::PhysicalPlanNode {
2567            physical_plan_type: Some(PhysicalPlanType::SortPreservingMerge(Box::new(
2568                protobuf::SortPreservingMergeExecNode {
2569                    input: Some(Box::new(input)),
2570                    expr,
2571                    fetch: exec.fetch().map(|f| f as i64).unwrap_or(-1),
2572                },
2573            ))),
2574        })
2575    }
2576
2577    fn try_from_nested_loop_join_exec(
2578        exec: &NestedLoopJoinExec,
2579        extension_codec: &dyn PhysicalExtensionCodec,
2580    ) -> Result<Self> {
2581        let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2582            exec.left().to_owned(),
2583            extension_codec,
2584        )?;
2585        let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2586            exec.right().to_owned(),
2587            extension_codec,
2588        )?;
2589
2590        let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2591        let filter = exec
2592            .filter()
2593            .as_ref()
2594            .map(|f| {
2595                let expression =
2596                    serialize_physical_expr(f.expression(), extension_codec)?;
2597                let column_indices = f
2598                    .column_indices()
2599                    .iter()
2600                    .map(|i| {
2601                        let side: protobuf::JoinSide = i.side.to_owned().into();
2602                        protobuf::ColumnIndex {
2603                            index: i.index as u32,
2604                            side: side.into(),
2605                        }
2606                    })
2607                    .collect();
2608                let schema = f.schema().as_ref().try_into()?;
2609                Ok(protobuf::JoinFilter {
2610                    expression: Some(expression),
2611                    column_indices,
2612                    schema: Some(schema),
2613                })
2614            })
2615            .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2616
2617        Ok(protobuf::PhysicalPlanNode {
2618            physical_plan_type: Some(PhysicalPlanType::NestedLoopJoin(Box::new(
2619                protobuf::NestedLoopJoinExecNode {
2620                    left: Some(Box::new(left)),
2621                    right: Some(Box::new(right)),
2622                    join_type: join_type.into(),
2623                    filter,
2624                    projection: exec.projection().map_or_else(Vec::new, |v| {
2625                        v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2626                    }),
2627                },
2628            ))),
2629        })
2630    }
2631
2632    fn try_from_window_agg_exec(
2633        exec: &WindowAggExec,
2634        extension_codec: &dyn PhysicalExtensionCodec,
2635    ) -> Result<Self> {
2636        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2637            exec.input().to_owned(),
2638            extension_codec,
2639        )?;
2640
2641        let window_expr = exec
2642            .window_expr()
2643            .iter()
2644            .map(|e| serialize_physical_window_expr(e, extension_codec))
2645            .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
2646
2647        let partition_keys = exec
2648            .partition_keys()
2649            .iter()
2650            .map(|e| serialize_physical_expr(e, extension_codec))
2651            .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
2652
2653        Ok(protobuf::PhysicalPlanNode {
2654            physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
2655                protobuf::WindowAggExecNode {
2656                    input: Some(Box::new(input)),
2657                    window_expr,
2658                    partition_keys,
2659                    input_order_mode: None,
2660                },
2661            ))),
2662        })
2663    }
2664
2665    fn try_from_bounded_window_agg_exec(
2666        exec: &BoundedWindowAggExec,
2667        extension_codec: &dyn PhysicalExtensionCodec,
2668    ) -> Result<Self> {
2669        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2670            exec.input().to_owned(),
2671            extension_codec,
2672        )?;
2673
2674        let window_expr = exec
2675            .window_expr()
2676            .iter()
2677            .map(|e| serialize_physical_window_expr(e, extension_codec))
2678            .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
2679
2680        let partition_keys = exec
2681            .partition_keys()
2682            .iter()
2683            .map(|e| serialize_physical_expr(e, extension_codec))
2684            .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
2685
2686        let input_order_mode = match &exec.input_order_mode {
2687            InputOrderMode::Linear => {
2688                window_agg_exec_node::InputOrderMode::Linear(protobuf::EmptyMessage {})
2689            }
2690            InputOrderMode::PartiallySorted(columns) => {
2691                window_agg_exec_node::InputOrderMode::PartiallySorted(
2692                    protobuf::PartiallySortedInputOrderMode {
2693                        columns: columns.iter().map(|c| *c as u64).collect(),
2694                    },
2695                )
2696            }
2697            InputOrderMode::Sorted => {
2698                window_agg_exec_node::InputOrderMode::Sorted(protobuf::EmptyMessage {})
2699            }
2700        };
2701
2702        Ok(protobuf::PhysicalPlanNode {
2703            physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
2704                protobuf::WindowAggExecNode {
2705                    input: Some(Box::new(input)),
2706                    window_expr,
2707                    partition_keys,
2708                    input_order_mode: Some(input_order_mode),
2709                },
2710            ))),
2711        })
2712    }
2713
2714    fn try_from_data_sink_exec(
2715        exec: &DataSinkExec,
2716        extension_codec: &dyn PhysicalExtensionCodec,
2717    ) -> Result<Option<Self>> {
2718        let input: protobuf::PhysicalPlanNode =
2719            protobuf::PhysicalPlanNode::try_from_physical_plan(
2720                exec.input().to_owned(),
2721                extension_codec,
2722            )?;
2723        let sort_order = match exec.sort_order() {
2724            Some(requirements) => {
2725                let expr = requirements
2726                    .iter()
2727                    .map(|requirement| {
2728                        let expr: PhysicalSortExpr = requirement.to_owned().into();
2729                        let sort_expr = protobuf::PhysicalSortExprNode {
2730                            expr: Some(Box::new(serialize_physical_expr(
2731                                &expr.expr,
2732                                extension_codec,
2733                            )?)),
2734                            asc: !expr.options.descending,
2735                            nulls_first: expr.options.nulls_first,
2736                        };
2737                        Ok(sort_expr)
2738                    })
2739                    .collect::<Result<Vec<_>>>()?;
2740                Some(protobuf::PhysicalSortExprNodeCollection {
2741                    physical_sort_expr_nodes: expr,
2742                })
2743            }
2744            None => None,
2745        };
2746
2747        if let Some(sink) = exec.sink().as_any().downcast_ref::<JsonSink>() {
2748            return Ok(Some(protobuf::PhysicalPlanNode {
2749                physical_plan_type: Some(PhysicalPlanType::JsonSink(Box::new(
2750                    protobuf::JsonSinkExecNode {
2751                        input: Some(Box::new(input)),
2752                        sink: Some(sink.try_into()?),
2753                        sink_schema: Some(exec.schema().as_ref().try_into()?),
2754                        sort_order,
2755                    },
2756                ))),
2757            }));
2758        }
2759
2760        if let Some(sink) = exec.sink().as_any().downcast_ref::<CsvSink>() {
2761            return Ok(Some(protobuf::PhysicalPlanNode {
2762                physical_plan_type: Some(PhysicalPlanType::CsvSink(Box::new(
2763                    protobuf::CsvSinkExecNode {
2764                        input: Some(Box::new(input)),
2765                        sink: Some(sink.try_into()?),
2766                        sink_schema: Some(exec.schema().as_ref().try_into()?),
2767                        sort_order,
2768                    },
2769                ))),
2770            }));
2771        }
2772
2773        #[cfg(feature = "parquet")]
2774        if let Some(sink) = exec.sink().as_any().downcast_ref::<ParquetSink>() {
2775            return Ok(Some(protobuf::PhysicalPlanNode {
2776                physical_plan_type: Some(PhysicalPlanType::ParquetSink(Box::new(
2777                    protobuf::ParquetSinkExecNode {
2778                        input: Some(Box::new(input)),
2779                        sink: Some(sink.try_into()?),
2780                        sink_schema: Some(exec.schema().as_ref().try_into()?),
2781                        sort_order,
2782                    },
2783                ))),
2784            }));
2785        }
2786
2787        // If unknown DataSink then let extension handle it
2788        Ok(None)
2789    }
2790
2791    fn try_from_unnest_exec(
2792        exec: &UnnestExec,
2793        extension_codec: &dyn PhysicalExtensionCodec,
2794    ) -> Result<Self> {
2795        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2796            exec.input().to_owned(),
2797            extension_codec,
2798        )?;
2799
2800        Ok(protobuf::PhysicalPlanNode {
2801            physical_plan_type: Some(PhysicalPlanType::Unnest(Box::new(
2802                protobuf::UnnestExecNode {
2803                    input: Some(Box::new(input)),
2804                    schema: Some(exec.schema().try_into()?),
2805                    list_type_columns: exec
2806                        .list_column_indices()
2807                        .iter()
2808                        .map(|c| ProtoListUnnest {
2809                            index_in_input_schema: c.index_in_input_schema as _,
2810                            depth: c.depth as _,
2811                        })
2812                        .collect(),
2813                    struct_type_columns: exec
2814                        .struct_column_indices()
2815                        .iter()
2816                        .map(|c| *c as _)
2817                        .collect(),
2818                    options: Some(exec.options().into()),
2819                },
2820            ))),
2821        })
2822    }
2823
2824    fn try_from_cooperative_exec(
2825        exec: &CooperativeExec,
2826        extension_codec: &dyn PhysicalExtensionCodec,
2827    ) -> Result<Self> {
2828        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2829            exec.input().to_owned(),
2830            extension_codec,
2831        )?;
2832
2833        Ok(protobuf::PhysicalPlanNode {
2834            physical_plan_type: Some(PhysicalPlanType::Cooperative(Box::new(
2835                protobuf::CooperativeExecNode {
2836                    input: Some(Box::new(input)),
2837                },
2838            ))),
2839        })
2840    }
2841}
2842
2843pub trait AsExecutionPlan: Debug + Send + Sync + Clone {
2844    fn try_decode(buf: &[u8]) -> Result<Self>
2845    where
2846        Self: Sized;
2847
2848    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
2849    where
2850        B: BufMut,
2851        Self: Sized;
2852
2853    fn try_into_physical_plan(
2854        &self,
2855        registry: &dyn FunctionRegistry,
2856        runtime: &RuntimeEnv,
2857        extension_codec: &dyn PhysicalExtensionCodec,
2858    ) -> Result<Arc<dyn ExecutionPlan>>;
2859
2860    fn try_from_physical_plan(
2861        plan: Arc<dyn ExecutionPlan>,
2862        extension_codec: &dyn PhysicalExtensionCodec,
2863    ) -> Result<Self>
2864    where
2865        Self: Sized;
2866}
2867
2868pub trait PhysicalExtensionCodec: Debug + Send + Sync {
2869    fn try_decode(
2870        &self,
2871        buf: &[u8],
2872        inputs: &[Arc<dyn ExecutionPlan>],
2873        registry: &dyn FunctionRegistry,
2874    ) -> Result<Arc<dyn ExecutionPlan>>;
2875
2876    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()>;
2877
2878    fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
2879        not_impl_err!("PhysicalExtensionCodec is not provided for scalar function {name}")
2880    }
2881
2882    fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
2883        Ok(())
2884    }
2885
2886    fn try_decode_expr(
2887        &self,
2888        _buf: &[u8],
2889        _inputs: &[Arc<dyn PhysicalExpr>],
2890    ) -> Result<Arc<dyn PhysicalExpr>> {
2891        not_impl_err!("PhysicalExtensionCodec is not provided")
2892    }
2893
2894    fn try_encode_expr(
2895        &self,
2896        _node: &Arc<dyn PhysicalExpr>,
2897        _buf: &mut Vec<u8>,
2898    ) -> Result<()> {
2899        not_impl_err!("PhysicalExtensionCodec is not provided")
2900    }
2901
2902    fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
2903        not_impl_err!(
2904            "PhysicalExtensionCodec is not provided for aggregate function {name}"
2905        )
2906    }
2907
2908    fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
2909        Ok(())
2910    }
2911
2912    fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
2913        not_impl_err!("PhysicalExtensionCodec is not provided for window function {name}")
2914    }
2915
2916    fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
2917        Ok(())
2918    }
2919}
2920
2921#[derive(Debug)]
2922pub struct DefaultPhysicalExtensionCodec {}
2923
2924impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec {
2925    fn try_decode(
2926        &self,
2927        _buf: &[u8],
2928        _inputs: &[Arc<dyn ExecutionPlan>],
2929        _registry: &dyn FunctionRegistry,
2930    ) -> Result<Arc<dyn ExecutionPlan>> {
2931        not_impl_err!("PhysicalExtensionCodec is not provided")
2932    }
2933
2934    fn try_encode(
2935        &self,
2936        _node: Arc<dyn ExecutionPlan>,
2937        _buf: &mut Vec<u8>,
2938    ) -> Result<()> {
2939        not_impl_err!("PhysicalExtensionCodec is not provided")
2940    }
2941}
2942
2943fn into_physical_plan(
2944    node: &Option<Box<protobuf::PhysicalPlanNode>>,
2945    registry: &dyn FunctionRegistry,
2946    runtime: &RuntimeEnv,
2947    extension_codec: &dyn PhysicalExtensionCodec,
2948) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
2949    if let Some(field) = node {
2950        field.try_into_physical_plan(registry, runtime, extension_codec)
2951    } else {
2952        Err(proto_error("Missing required field in protobuf"))
2953    }
2954}