datafusion_proto/physical_plan/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use self::from_proto::parse_protobuf_partitioning;
22use self::to_proto::{serialize_partitioning, serialize_physical_expr};
23use crate::common::{byte_to_string, str_to_byte};
24use crate::physical_plan::from_proto::{
25    parse_physical_expr, parse_physical_sort_expr, parse_physical_sort_exprs,
26    parse_physical_window_expr, parse_protobuf_file_scan_config, parse_record_batches,
27    parse_table_schema_from_proto,
28};
29use crate::physical_plan::to_proto::{
30    serialize_file_scan_config, serialize_maybe_filter, serialize_physical_aggr_expr,
31    serialize_physical_sort_exprs, serialize_physical_window_expr,
32    serialize_record_batches,
33};
34use crate::protobuf::physical_aggregate_expr_node::AggregateFunction;
35use crate::protobuf::physical_expr_node::ExprType;
36use crate::protobuf::physical_plan_node::PhysicalPlanType;
37use crate::protobuf::{
38    self, ListUnnest as ProtoListUnnest, SortExprNode, SortMergeJoinExecNode,
39    proto_error, window_agg_exec_node,
40};
41use crate::{convert_required, into_required};
42
43use arrow::compute::SortOptions;
44use arrow::datatypes::{IntervalMonthDayNanoType, SchemaRef};
45use datafusion_catalog::memory::MemorySourceConfig;
46use datafusion_common::config::CsvOptions;
47use datafusion_common::{
48    DataFusionError, Result, internal_datafusion_err, internal_err, not_impl_err,
49};
50#[cfg(feature = "parquet")]
51use datafusion_datasource::file::FileSource;
52use datafusion_datasource::file_compression_type::FileCompressionType;
53use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
54use datafusion_datasource::sink::DataSinkExec;
55use datafusion_datasource::source::{DataSource, DataSourceExec};
56#[cfg(feature = "avro")]
57use datafusion_datasource_avro::source::AvroSource;
58use datafusion_datasource_csv::file_format::CsvSink;
59use datafusion_datasource_csv::source::CsvSource;
60use datafusion_datasource_json::file_format::JsonSink;
61use datafusion_datasource_json::source::JsonSource;
62#[cfg(feature = "parquet")]
63use datafusion_datasource_parquet::file_format::ParquetSink;
64#[cfg(feature = "parquet")]
65use datafusion_datasource_parquet::source::ParquetSource;
66use datafusion_execution::{FunctionRegistry, TaskContext};
67use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
68use datafusion_functions_table::generate_series::{
69    Empty, GenSeriesArgs, GenerateSeriesTable, GenericSeriesState, TimestampValue,
70};
71use datafusion_physical_expr::aggregate::AggregateExprBuilder;
72use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
73use datafusion_physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef};
74use datafusion_physical_plan::aggregates::AggregateMode;
75use datafusion_physical_plan::aggregates::{AggregateExec, PhysicalGroupBy};
76use datafusion_physical_plan::analyze::AnalyzeExec;
77use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
78use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
79use datafusion_physical_plan::coop::CooperativeExec;
80use datafusion_physical_plan::empty::EmptyExec;
81use datafusion_physical_plan::explain::ExplainExec;
82use datafusion_physical_plan::expressions::PhysicalSortExpr;
83use datafusion_physical_plan::filter::FilterExec;
84use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter};
85use datafusion_physical_plan::joins::{
86    CrossJoinExec, NestedLoopJoinExec, SortMergeJoinExec, StreamJoinPartitionMode,
87    SymmetricHashJoinExec,
88};
89use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
90use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
91use datafusion_physical_plan::memory::LazyMemoryExec;
92use datafusion_physical_plan::metrics::MetricType;
93use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
94use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
95use datafusion_physical_plan::repartition::RepartitionExec;
96use datafusion_physical_plan::sorts::sort::SortExec;
97use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
98use datafusion_physical_plan::union::{InterleaveExec, UnionExec};
99use datafusion_physical_plan::unnest::{ListUnnest, UnnestExec};
100use datafusion_physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
101use datafusion_physical_plan::{ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr};
102
103use datafusion_physical_expr::async_scalar_function::AsyncFuncExpr;
104use datafusion_physical_plan::async_func::AsyncFuncExec;
105use prost::Message;
106use prost::bytes::BufMut;
107
108pub mod from_proto;
109pub mod to_proto;
110
111impl AsExecutionPlan for protobuf::PhysicalPlanNode {
112    fn try_decode(buf: &[u8]) -> Result<Self>
113    where
114        Self: Sized,
115    {
116        protobuf::PhysicalPlanNode::decode(buf).map_err(|e| {
117            internal_datafusion_err!("failed to decode physical plan: {e:?}")
118        })
119    }
120
121    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
122    where
123        B: BufMut,
124        Self: Sized,
125    {
126        self.encode(buf).map_err(|e| {
127            internal_datafusion_err!("failed to encode physical plan: {e:?}")
128        })
129    }
130
131    fn try_into_physical_plan(
132        &self,
133        ctx: &TaskContext,
134
135        extension_codec: &dyn PhysicalExtensionCodec,
136    ) -> Result<Arc<dyn ExecutionPlan>> {
137        let plan = self.physical_plan_type.as_ref().ok_or_else(|| {
138            proto_error(format!(
139                "physical_plan::from_proto() Unsupported physical plan '{self:?}'"
140            ))
141        })?;
142        match plan {
143            PhysicalPlanType::Explain(explain) => {
144                self.try_into_explain_physical_plan(explain, ctx, extension_codec)
145            }
146            PhysicalPlanType::Projection(projection) => {
147                self.try_into_projection_physical_plan(projection, ctx, extension_codec)
148            }
149            PhysicalPlanType::Filter(filter) => {
150                self.try_into_filter_physical_plan(filter, ctx, extension_codec)
151            }
152            PhysicalPlanType::CsvScan(scan) => {
153                self.try_into_csv_scan_physical_plan(scan, ctx, extension_codec)
154            }
155            PhysicalPlanType::JsonScan(scan) => {
156                self.try_into_json_scan_physical_plan(scan, ctx, extension_codec)
157            }
158            PhysicalPlanType::ParquetScan(scan) => {
159                self.try_into_parquet_scan_physical_plan(scan, ctx, extension_codec)
160            }
161            PhysicalPlanType::AvroScan(scan) => {
162                self.try_into_avro_scan_physical_plan(scan, ctx, extension_codec)
163            }
164            PhysicalPlanType::MemoryScan(scan) => {
165                self.try_into_memory_scan_physical_plan(scan, ctx, extension_codec)
166            }
167            PhysicalPlanType::CoalesceBatches(coalesce_batches) => self
168                .try_into_coalesce_batches_physical_plan(
169                    coalesce_batches,
170                    ctx,
171                    extension_codec,
172                ),
173            PhysicalPlanType::Merge(merge) => {
174                self.try_into_merge_physical_plan(merge, ctx, extension_codec)
175            }
176            PhysicalPlanType::Repartition(repart) => {
177                self.try_into_repartition_physical_plan(repart, ctx, extension_codec)
178            }
179            PhysicalPlanType::GlobalLimit(limit) => {
180                self.try_into_global_limit_physical_plan(limit, ctx, extension_codec)
181            }
182            PhysicalPlanType::LocalLimit(limit) => {
183                self.try_into_local_limit_physical_plan(limit, ctx, extension_codec)
184            }
185            PhysicalPlanType::Window(window_agg) => {
186                self.try_into_window_physical_plan(window_agg, ctx, extension_codec)
187            }
188            PhysicalPlanType::Aggregate(hash_agg) => {
189                self.try_into_aggregate_physical_plan(hash_agg, ctx, extension_codec)
190            }
191            PhysicalPlanType::HashJoin(hashjoin) => {
192                self.try_into_hash_join_physical_plan(hashjoin, ctx, extension_codec)
193            }
194            PhysicalPlanType::SymmetricHashJoin(sym_join) => self
195                .try_into_symmetric_hash_join_physical_plan(
196                    sym_join,
197                    ctx,
198                    extension_codec,
199                ),
200            PhysicalPlanType::Union(union) => {
201                self.try_into_union_physical_plan(union, ctx, extension_codec)
202            }
203            PhysicalPlanType::Interleave(interleave) => {
204                self.try_into_interleave_physical_plan(interleave, ctx, extension_codec)
205            }
206            PhysicalPlanType::CrossJoin(crossjoin) => {
207                self.try_into_cross_join_physical_plan(crossjoin, ctx, extension_codec)
208            }
209            PhysicalPlanType::Empty(empty) => {
210                self.try_into_empty_physical_plan(empty, ctx, extension_codec)
211            }
212            PhysicalPlanType::PlaceholderRow(placeholder) => self
213                .try_into_placeholder_row_physical_plan(
214                    placeholder,
215                    ctx,
216                    extension_codec,
217                ),
218            PhysicalPlanType::Sort(sort) => {
219                self.try_into_sort_physical_plan(sort, ctx, extension_codec)
220            }
221            PhysicalPlanType::SortPreservingMerge(sort) => self
222                .try_into_sort_preserving_merge_physical_plan(sort, ctx, extension_codec),
223            PhysicalPlanType::Extension(extension) => {
224                self.try_into_extension_physical_plan(extension, ctx, extension_codec)
225            }
226            PhysicalPlanType::NestedLoopJoin(join) => {
227                self.try_into_nested_loop_join_physical_plan(join, ctx, extension_codec)
228            }
229            PhysicalPlanType::Analyze(analyze) => {
230                self.try_into_analyze_physical_plan(analyze, ctx, extension_codec)
231            }
232            PhysicalPlanType::JsonSink(sink) => {
233                self.try_into_json_sink_physical_plan(sink, ctx, extension_codec)
234            }
235            PhysicalPlanType::CsvSink(sink) => {
236                self.try_into_csv_sink_physical_plan(sink, ctx, extension_codec)
237            }
238            #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
239            PhysicalPlanType::ParquetSink(sink) => {
240                self.try_into_parquet_sink_physical_plan(sink, ctx, extension_codec)
241            }
242            PhysicalPlanType::Unnest(unnest) => {
243                self.try_into_unnest_physical_plan(unnest, ctx, extension_codec)
244            }
245            PhysicalPlanType::Cooperative(cooperative) => {
246                self.try_into_cooperative_physical_plan(cooperative, ctx, extension_codec)
247            }
248            PhysicalPlanType::GenerateSeries(generate_series) => {
249                self.try_into_generate_series_physical_plan(generate_series)
250            }
251            PhysicalPlanType::SortMergeJoin(sort_join) => {
252                self.try_into_sort_join(sort_join, ctx, extension_codec)
253            }
254            PhysicalPlanType::AsyncFunc(async_func) => {
255                self.try_into_async_func_physical_plan(async_func, ctx, extension_codec)
256            }
257        }
258    }
259
260    fn try_from_physical_plan(
261        plan: Arc<dyn ExecutionPlan>,
262        extension_codec: &dyn PhysicalExtensionCodec,
263    ) -> Result<Self>
264    where
265        Self: Sized,
266    {
267        let plan_clone = Arc::clone(&plan);
268        let plan = plan.as_any();
269
270        if let Some(exec) = plan.downcast_ref::<ExplainExec>() {
271            return protobuf::PhysicalPlanNode::try_from_explain_exec(
272                exec,
273                extension_codec,
274            );
275        }
276
277        if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
278            return protobuf::PhysicalPlanNode::try_from_projection_exec(
279                exec,
280                extension_codec,
281            );
282        }
283
284        if let Some(exec) = plan.downcast_ref::<AnalyzeExec>() {
285            return protobuf::PhysicalPlanNode::try_from_analyze_exec(
286                exec,
287                extension_codec,
288            );
289        }
290
291        if let Some(exec) = plan.downcast_ref::<FilterExec>() {
292            return protobuf::PhysicalPlanNode::try_from_filter_exec(
293                exec,
294                extension_codec,
295            );
296        }
297
298        if let Some(limit) = plan.downcast_ref::<GlobalLimitExec>() {
299            return protobuf::PhysicalPlanNode::try_from_global_limit_exec(
300                limit,
301                extension_codec,
302            );
303        }
304
305        if let Some(limit) = plan.downcast_ref::<LocalLimitExec>() {
306            return protobuf::PhysicalPlanNode::try_from_local_limit_exec(
307                limit,
308                extension_codec,
309            );
310        }
311
312        if let Some(exec) = plan.downcast_ref::<HashJoinExec>() {
313            return protobuf::PhysicalPlanNode::try_from_hash_join_exec(
314                exec,
315                extension_codec,
316            );
317        }
318
319        if let Some(exec) = plan.downcast_ref::<SymmetricHashJoinExec>() {
320            return protobuf::PhysicalPlanNode::try_from_symmetric_hash_join_exec(
321                exec,
322                extension_codec,
323            );
324        }
325
326        if let Some(exec) = plan.downcast_ref::<SortMergeJoinExec>() {
327            return protobuf::PhysicalPlanNode::try_from_sort_merge_join_exec(
328                exec,
329                extension_codec,
330            );
331        }
332
333        if let Some(exec) = plan.downcast_ref::<CrossJoinExec>() {
334            return protobuf::PhysicalPlanNode::try_from_cross_join_exec(
335                exec,
336                extension_codec,
337            );
338        }
339
340        if let Some(exec) = plan.downcast_ref::<AggregateExec>() {
341            return protobuf::PhysicalPlanNode::try_from_aggregate_exec(
342                exec,
343                extension_codec,
344            );
345        }
346
347        if let Some(empty) = plan.downcast_ref::<EmptyExec>() {
348            return protobuf::PhysicalPlanNode::try_from_empty_exec(
349                empty,
350                extension_codec,
351            );
352        }
353
354        if let Some(empty) = plan.downcast_ref::<PlaceholderRowExec>() {
355            return protobuf::PhysicalPlanNode::try_from_placeholder_row_exec(
356                empty,
357                extension_codec,
358            );
359        }
360
361        if let Some(coalesce_batches) = plan.downcast_ref::<CoalesceBatchesExec>() {
362            return protobuf::PhysicalPlanNode::try_from_coalesce_batches_exec(
363                coalesce_batches,
364                extension_codec,
365            );
366        }
367
368        if let Some(data_source_exec) = plan.downcast_ref::<DataSourceExec>()
369            && let Some(node) = protobuf::PhysicalPlanNode::try_from_data_source_exec(
370                data_source_exec,
371                extension_codec,
372            )?
373        {
374            return Ok(node);
375        }
376
377        if let Some(exec) = plan.downcast_ref::<CoalescePartitionsExec>() {
378            return protobuf::PhysicalPlanNode::try_from_coalesce_partitions_exec(
379                exec,
380                extension_codec,
381            );
382        }
383
384        if let Some(exec) = plan.downcast_ref::<RepartitionExec>() {
385            return protobuf::PhysicalPlanNode::try_from_repartition_exec(
386                exec,
387                extension_codec,
388            );
389        }
390
391        if let Some(exec) = plan.downcast_ref::<SortExec>() {
392            return protobuf::PhysicalPlanNode::try_from_sort_exec(exec, extension_codec);
393        }
394
395        if let Some(union) = plan.downcast_ref::<UnionExec>() {
396            return protobuf::PhysicalPlanNode::try_from_union_exec(
397                union,
398                extension_codec,
399            );
400        }
401
402        if let Some(interleave) = plan.downcast_ref::<InterleaveExec>() {
403            return protobuf::PhysicalPlanNode::try_from_interleave_exec(
404                interleave,
405                extension_codec,
406            );
407        }
408
409        if let Some(exec) = plan.downcast_ref::<SortPreservingMergeExec>() {
410            return protobuf::PhysicalPlanNode::try_from_sort_preserving_merge_exec(
411                exec,
412                extension_codec,
413            );
414        }
415
416        if let Some(exec) = plan.downcast_ref::<NestedLoopJoinExec>() {
417            return protobuf::PhysicalPlanNode::try_from_nested_loop_join_exec(
418                exec,
419                extension_codec,
420            );
421        }
422
423        if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
424            return protobuf::PhysicalPlanNode::try_from_window_agg_exec(
425                exec,
426                extension_codec,
427            );
428        }
429
430        if let Some(exec) = plan.downcast_ref::<BoundedWindowAggExec>() {
431            return protobuf::PhysicalPlanNode::try_from_bounded_window_agg_exec(
432                exec,
433                extension_codec,
434            );
435        }
436
437        if let Some(exec) = plan.downcast_ref::<DataSinkExec>()
438            && let Some(node) = protobuf::PhysicalPlanNode::try_from_data_sink_exec(
439                exec,
440                extension_codec,
441            )?
442        {
443            return Ok(node);
444        }
445
446        if let Some(exec) = plan.downcast_ref::<UnnestExec>() {
447            return protobuf::PhysicalPlanNode::try_from_unnest_exec(
448                exec,
449                extension_codec,
450            );
451        }
452
453        if let Some(exec) = plan.downcast_ref::<CooperativeExec>() {
454            return protobuf::PhysicalPlanNode::try_from_cooperative_exec(
455                exec,
456                extension_codec,
457            );
458        }
459
460        if let Some(exec) = plan.downcast_ref::<LazyMemoryExec>()
461            && let Some(node) =
462                protobuf::PhysicalPlanNode::try_from_lazy_memory_exec(exec)?
463        {
464            return Ok(node);
465        }
466
467        if let Some(exec) = plan.downcast_ref::<AsyncFuncExec>() {
468            return protobuf::PhysicalPlanNode::try_from_async_func_exec(
469                exec,
470                extension_codec,
471            );
472        }
473
474        let mut buf: Vec<u8> = vec![];
475        match extension_codec.try_encode(Arc::clone(&plan_clone), &mut buf) {
476            Ok(_) => {
477                let inputs: Vec<protobuf::PhysicalPlanNode> = plan_clone
478                    .children()
479                    .into_iter()
480                    .cloned()
481                    .map(|i| {
482                        protobuf::PhysicalPlanNode::try_from_physical_plan(
483                            i,
484                            extension_codec,
485                        )
486                    })
487                    .collect::<Result<_>>()?;
488
489                Ok(protobuf::PhysicalPlanNode {
490                    physical_plan_type: Some(PhysicalPlanType::Extension(
491                        protobuf::PhysicalExtensionNode { node: buf, inputs },
492                    )),
493                })
494            }
495            Err(e) => internal_err!(
496                "Unsupported plan and extension codec failed with [{e}]. Plan: {plan_clone:?}"
497            ),
498        }
499    }
500}
501
502impl protobuf::PhysicalPlanNode {
503    fn try_into_explain_physical_plan(
504        &self,
505        explain: &protobuf::ExplainExecNode,
506        _ctx: &TaskContext,
507
508        _extension_codec: &dyn PhysicalExtensionCodec,
509    ) -> Result<Arc<dyn ExecutionPlan>> {
510        Ok(Arc::new(ExplainExec::new(
511            Arc::new(explain.schema.as_ref().unwrap().try_into()?),
512            explain
513                .stringified_plans
514                .iter()
515                .map(|plan| plan.into())
516                .collect(),
517            explain.verbose,
518        )))
519    }
520
521    fn try_into_projection_physical_plan(
522        &self,
523        projection: &protobuf::ProjectionExecNode,
524        ctx: &TaskContext,
525
526        extension_codec: &dyn PhysicalExtensionCodec,
527    ) -> Result<Arc<dyn ExecutionPlan>> {
528        let input: Arc<dyn ExecutionPlan> =
529            into_physical_plan(&projection.input, ctx, extension_codec)?;
530        let exprs = projection
531            .expr
532            .iter()
533            .zip(projection.expr_name.iter())
534            .map(|(expr, name)| {
535                Ok((
536                    parse_physical_expr(
537                        expr,
538                        ctx,
539                        input.schema().as_ref(),
540                        extension_codec,
541                    )?,
542                    name.to_string(),
543                ))
544            })
545            .collect::<Result<Vec<(Arc<dyn PhysicalExpr>, String)>>>()?;
546        let proj_exprs: Vec<ProjectionExpr> = exprs
547            .into_iter()
548            .map(|(expr, alias)| ProjectionExpr { expr, alias })
549            .collect();
550        Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?))
551    }
552
553    fn try_into_filter_physical_plan(
554        &self,
555        filter: &protobuf::FilterExecNode,
556        ctx: &TaskContext,
557
558        extension_codec: &dyn PhysicalExtensionCodec,
559    ) -> Result<Arc<dyn ExecutionPlan>> {
560        let input: Arc<dyn ExecutionPlan> =
561            into_physical_plan(&filter.input, ctx, extension_codec)?;
562
563        let predicate = filter
564            .expr
565            .as_ref()
566            .map(|expr| {
567                parse_physical_expr(expr, ctx, input.schema().as_ref(), extension_codec)
568            })
569            .transpose()?
570            .ok_or_else(|| {
571                internal_datafusion_err!(
572                    "filter (FilterExecNode) in PhysicalPlanNode is missing."
573                )
574            })?;
575
576        let filter_selectivity = filter.default_filter_selectivity.try_into();
577        let projection = if !filter.projection.is_empty() {
578            Some(
579                filter
580                    .projection
581                    .iter()
582                    .map(|i| *i as usize)
583                    .collect::<Vec<_>>(),
584            )
585        } else {
586            None
587        };
588
589        let filter =
590            FilterExec::try_new(predicate, input)?.with_projection(projection)?;
591        match filter_selectivity {
592            Ok(filter_selectivity) => Ok(Arc::new(
593                filter.with_default_selectivity(filter_selectivity)?,
594            )),
595            Err(_) => Err(internal_datafusion_err!(
596                "filter_selectivity in PhysicalPlanNode is invalid "
597            )),
598        }
599    }
600
601    fn try_into_csv_scan_physical_plan(
602        &self,
603        scan: &protobuf::CsvScanExecNode,
604        ctx: &TaskContext,
605
606        extension_codec: &dyn PhysicalExtensionCodec,
607    ) -> Result<Arc<dyn ExecutionPlan>> {
608        let escape =
609            if let Some(protobuf::csv_scan_exec_node::OptionalEscape::Escape(escape)) =
610                &scan.optional_escape
611            {
612                Some(str_to_byte(escape, "escape")?)
613            } else {
614                None
615            };
616
617        let comment = if let Some(
618            protobuf::csv_scan_exec_node::OptionalComment::Comment(comment),
619        ) = &scan.optional_comment
620        {
621            Some(str_to_byte(comment, "comment")?)
622        } else {
623            None
624        };
625
626        // Parse table schema with partition columns
627        let table_schema =
628            parse_table_schema_from_proto(scan.base_conf.as_ref().unwrap())?;
629
630        let csv_options = CsvOptions {
631            has_header: Some(scan.has_header),
632            delimiter: str_to_byte(&scan.delimiter, "delimiter")?,
633            quote: str_to_byte(&scan.quote, "quote")?,
634            newlines_in_values: Some(scan.newlines_in_values),
635            ..Default::default()
636        };
637        let source = Arc::new(
638            CsvSource::new(table_schema)
639                .with_csv_options(csv_options)
640                .with_escape(escape)
641                .with_comment(comment),
642        );
643
644        let conf = FileScanConfigBuilder::from(parse_protobuf_file_scan_config(
645            scan.base_conf.as_ref().unwrap(),
646            ctx,
647            extension_codec,
648            source,
649        )?)
650        .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
651        .build();
652        Ok(DataSourceExec::from_data_source(conf))
653    }
654
655    fn try_into_json_scan_physical_plan(
656        &self,
657        scan: &protobuf::JsonScanExecNode,
658        ctx: &TaskContext,
659
660        extension_codec: &dyn PhysicalExtensionCodec,
661    ) -> Result<Arc<dyn ExecutionPlan>> {
662        let base_conf = scan.base_conf.as_ref().unwrap();
663        let table_schema = parse_table_schema_from_proto(base_conf)?;
664        let scan_conf = parse_protobuf_file_scan_config(
665            base_conf,
666            ctx,
667            extension_codec,
668            Arc::new(JsonSource::new(table_schema)),
669        )?;
670        Ok(DataSourceExec::from_data_source(scan_conf))
671    }
672
673    #[cfg_attr(not(feature = "parquet"), expect(unused_variables))]
674    fn try_into_parquet_scan_physical_plan(
675        &self,
676        scan: &protobuf::ParquetScanExecNode,
677        ctx: &TaskContext,
678        extension_codec: &dyn PhysicalExtensionCodec,
679    ) -> Result<Arc<dyn ExecutionPlan>> {
680        #[cfg(feature = "parquet")]
681        {
682            let schema = from_proto::parse_protobuf_file_scan_schema(
683                scan.base_conf.as_ref().unwrap(),
684            )?;
685
686            // Check if there's a projection and use projected schema for predicate parsing
687            let base_conf = scan.base_conf.as_ref().unwrap();
688            let predicate_schema = if !base_conf.projection.is_empty() {
689                // Create projected schema for parsing the predicate
690                let projected_fields: Vec<_> = base_conf
691                    .projection
692                    .iter()
693                    .map(|&i| schema.field(i as usize).clone())
694                    .collect();
695                Arc::new(arrow::datatypes::Schema::new(projected_fields))
696            } else {
697                schema
698            };
699
700            let predicate = scan
701                .predicate
702                .as_ref()
703                .map(|expr| {
704                    parse_physical_expr(
705                        expr,
706                        ctx,
707                        predicate_schema.as_ref(),
708                        extension_codec,
709                    )
710                })
711                .transpose()?;
712            let mut options = datafusion_common::config::TableParquetOptions::default();
713
714            if let Some(table_options) = scan.parquet_options.as_ref() {
715                options = table_options.try_into()?;
716            }
717
718            // Parse table schema with partition columns
719            let table_schema = parse_table_schema_from_proto(base_conf)?;
720
721            let mut source =
722                ParquetSource::new(table_schema).with_table_parquet_options(options);
723
724            if let Some(predicate) = predicate {
725                source = source.with_predicate(predicate);
726            }
727            let base_config = parse_protobuf_file_scan_config(
728                base_conf,
729                ctx,
730                extension_codec,
731                Arc::new(source),
732            )?;
733            Ok(DataSourceExec::from_data_source(base_config))
734        }
735        #[cfg(not(feature = "parquet"))]
736        panic!(
737            "Unable to process a Parquet PhysicalPlan when `parquet` feature is not enabled"
738        )
739    }
740
741    #[cfg_attr(not(feature = "avro"), expect(unused_variables))]
742    fn try_into_avro_scan_physical_plan(
743        &self,
744        scan: &protobuf::AvroScanExecNode,
745        ctx: &TaskContext,
746        extension_codec: &dyn PhysicalExtensionCodec,
747    ) -> Result<Arc<dyn ExecutionPlan>> {
748        #[cfg(feature = "avro")]
749        {
750            let table_schema =
751                parse_table_schema_from_proto(scan.base_conf.as_ref().unwrap())?;
752            let conf = parse_protobuf_file_scan_config(
753                scan.base_conf.as_ref().unwrap(),
754                ctx,
755                extension_codec,
756                Arc::new(AvroSource::new(table_schema)),
757            )?;
758            Ok(DataSourceExec::from_data_source(conf))
759        }
760
761        #[cfg(not(feature = "avro"))]
762        panic!("Unable to process a Avro PhysicalPlan when `avro` feature is not enabled")
763    }
764
765    fn try_into_memory_scan_physical_plan(
766        &self,
767        scan: &protobuf::MemoryScanExecNode,
768        ctx: &TaskContext,
769
770        extension_codec: &dyn PhysicalExtensionCodec,
771    ) -> Result<Arc<dyn ExecutionPlan>> {
772        let partitions = scan
773            .partitions
774            .iter()
775            .map(|p| parse_record_batches(p))
776            .collect::<Result<Vec<_>>>()?;
777
778        let proto_schema = scan.schema.as_ref().ok_or_else(|| {
779            internal_datafusion_err!("schema in MemoryScanExecNode is missing.")
780        })?;
781        let schema: SchemaRef = SchemaRef::new(proto_schema.try_into()?);
782
783        let projection = if !scan.projection.is_empty() {
784            Some(
785                scan.projection
786                    .iter()
787                    .map(|i| *i as usize)
788                    .collect::<Vec<_>>(),
789            )
790        } else {
791            None
792        };
793
794        let mut sort_information = vec![];
795        for ordering in &scan.sort_information {
796            let sort_exprs = parse_physical_sort_exprs(
797                &ordering.physical_sort_expr_nodes,
798                ctx,
799                &schema,
800                extension_codec,
801            )?;
802            sort_information.extend(LexOrdering::new(sort_exprs));
803        }
804
805        let source = MemorySourceConfig::try_new(&partitions, schema, projection)?
806            .with_limit(scan.fetch.map(|f| f as usize))
807            .with_show_sizes(scan.show_sizes);
808
809        let source = source.try_with_sort_information(sort_information)?;
810
811        Ok(DataSourceExec::from_data_source(source))
812    }
813
814    fn try_into_coalesce_batches_physical_plan(
815        &self,
816        coalesce_batches: &protobuf::CoalesceBatchesExecNode,
817        ctx: &TaskContext,
818
819        extension_codec: &dyn PhysicalExtensionCodec,
820    ) -> Result<Arc<dyn ExecutionPlan>> {
821        let input: Arc<dyn ExecutionPlan> =
822            into_physical_plan(&coalesce_batches.input, ctx, extension_codec)?;
823        Ok(Arc::new(
824            CoalesceBatchesExec::new(input, coalesce_batches.target_batch_size as usize)
825                .with_fetch(coalesce_batches.fetch.map(|f| f as usize)),
826        ))
827    }
828
829    fn try_into_merge_physical_plan(
830        &self,
831        merge: &protobuf::CoalescePartitionsExecNode,
832        ctx: &TaskContext,
833
834        extension_codec: &dyn PhysicalExtensionCodec,
835    ) -> Result<Arc<dyn ExecutionPlan>> {
836        let input: Arc<dyn ExecutionPlan> =
837            into_physical_plan(&merge.input, ctx, extension_codec)?;
838        Ok(Arc::new(
839            CoalescePartitionsExec::new(input)
840                .with_fetch(merge.fetch.map(|f| f as usize)),
841        ))
842    }
843
844    fn try_into_repartition_physical_plan(
845        &self,
846        repart: &protobuf::RepartitionExecNode,
847        ctx: &TaskContext,
848
849        extension_codec: &dyn PhysicalExtensionCodec,
850    ) -> Result<Arc<dyn ExecutionPlan>> {
851        let input: Arc<dyn ExecutionPlan> =
852            into_physical_plan(&repart.input, ctx, extension_codec)?;
853        let partitioning = parse_protobuf_partitioning(
854            repart.partitioning.as_ref(),
855            ctx,
856            input.schema().as_ref(),
857            extension_codec,
858        )?;
859        Ok(Arc::new(RepartitionExec::try_new(
860            input,
861            partitioning.unwrap(),
862        )?))
863    }
864
865    fn try_into_global_limit_physical_plan(
866        &self,
867        limit: &protobuf::GlobalLimitExecNode,
868        ctx: &TaskContext,
869
870        extension_codec: &dyn PhysicalExtensionCodec,
871    ) -> Result<Arc<dyn ExecutionPlan>> {
872        let input: Arc<dyn ExecutionPlan> =
873            into_physical_plan(&limit.input, ctx, extension_codec)?;
874        let fetch = if limit.fetch >= 0 {
875            Some(limit.fetch as usize)
876        } else {
877            None
878        };
879        Ok(Arc::new(GlobalLimitExec::new(
880            input,
881            limit.skip as usize,
882            fetch,
883        )))
884    }
885
886    fn try_into_local_limit_physical_plan(
887        &self,
888        limit: &protobuf::LocalLimitExecNode,
889        ctx: &TaskContext,
890
891        extension_codec: &dyn PhysicalExtensionCodec,
892    ) -> Result<Arc<dyn ExecutionPlan>> {
893        let input: Arc<dyn ExecutionPlan> =
894            into_physical_plan(&limit.input, ctx, extension_codec)?;
895        Ok(Arc::new(LocalLimitExec::new(input, limit.fetch as usize)))
896    }
897
898    fn try_into_window_physical_plan(
899        &self,
900        window_agg: &protobuf::WindowAggExecNode,
901        ctx: &TaskContext,
902
903        extension_codec: &dyn PhysicalExtensionCodec,
904    ) -> Result<Arc<dyn ExecutionPlan>> {
905        let input: Arc<dyn ExecutionPlan> =
906            into_physical_plan(&window_agg.input, ctx, extension_codec)?;
907        let input_schema = input.schema();
908
909        let physical_window_expr: Vec<Arc<dyn WindowExpr>> = window_agg
910            .window_expr
911            .iter()
912            .map(|window_expr| {
913                parse_physical_window_expr(
914                    window_expr,
915                    ctx,
916                    input_schema.as_ref(),
917                    extension_codec,
918                )
919            })
920            .collect::<Result<Vec<_>, _>>()?;
921
922        let partition_keys = window_agg
923            .partition_keys
924            .iter()
925            .map(|expr| {
926                parse_physical_expr(expr, ctx, input.schema().as_ref(), extension_codec)
927            })
928            .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
929
930        if let Some(input_order_mode) = window_agg.input_order_mode.as_ref() {
931            let input_order_mode = match input_order_mode {
932                window_agg_exec_node::InputOrderMode::Linear(_) => InputOrderMode::Linear,
933                window_agg_exec_node::InputOrderMode::PartiallySorted(
934                    protobuf::PartiallySortedInputOrderMode { columns },
935                ) => InputOrderMode::PartiallySorted(
936                    columns.iter().map(|c| *c as usize).collect(),
937                ),
938                window_agg_exec_node::InputOrderMode::Sorted(_) => InputOrderMode::Sorted,
939            };
940
941            Ok(Arc::new(BoundedWindowAggExec::try_new(
942                physical_window_expr,
943                input,
944                input_order_mode,
945                !partition_keys.is_empty(),
946            )?))
947        } else {
948            Ok(Arc::new(WindowAggExec::try_new(
949                physical_window_expr,
950                input,
951                !partition_keys.is_empty(),
952            )?))
953        }
954    }
955
956    fn try_into_aggregate_physical_plan(
957        &self,
958        hash_agg: &protobuf::AggregateExecNode,
959        ctx: &TaskContext,
960
961        extension_codec: &dyn PhysicalExtensionCodec,
962    ) -> Result<Arc<dyn ExecutionPlan>> {
963        let input: Arc<dyn ExecutionPlan> =
964            into_physical_plan(&hash_agg.input, ctx, extension_codec)?;
965        let mode = protobuf::AggregateMode::try_from(hash_agg.mode).map_err(|_| {
966            proto_error(format!(
967                "Received a AggregateNode message with unknown AggregateMode {}",
968                hash_agg.mode
969            ))
970        })?;
971        let agg_mode: AggregateMode = match mode {
972            protobuf::AggregateMode::Partial => AggregateMode::Partial,
973            protobuf::AggregateMode::Final => AggregateMode::Final,
974            protobuf::AggregateMode::FinalPartitioned => AggregateMode::FinalPartitioned,
975            protobuf::AggregateMode::Single => AggregateMode::Single,
976            protobuf::AggregateMode::SinglePartitioned => {
977                AggregateMode::SinglePartitioned
978            }
979        };
980
981        let num_expr = hash_agg.group_expr.len();
982
983        let group_expr = hash_agg
984            .group_expr
985            .iter()
986            .zip(hash_agg.group_expr_name.iter())
987            .map(|(expr, name)| {
988                parse_physical_expr(expr, ctx, input.schema().as_ref(), extension_codec)
989                    .map(|expr| (expr, name.to_string()))
990            })
991            .collect::<Result<Vec<_>, _>>()?;
992
993        let null_expr = hash_agg
994            .null_expr
995            .iter()
996            .zip(hash_agg.group_expr_name.iter())
997            .map(|(expr, name)| {
998                parse_physical_expr(expr, ctx, input.schema().as_ref(), extension_codec)
999                    .map(|expr| (expr, name.to_string()))
1000            })
1001            .collect::<Result<Vec<_>, _>>()?;
1002
1003        let groups: Vec<Vec<bool>> = if !hash_agg.groups.is_empty() {
1004            hash_agg
1005                .groups
1006                .chunks(num_expr)
1007                .map(|g| g.to_vec())
1008                .collect::<Vec<Vec<bool>>>()
1009        } else {
1010            vec![]
1011        };
1012
1013        let has_grouping_set = hash_agg.has_grouping_set;
1014
1015        let input_schema = hash_agg.input_schema.as_ref().ok_or_else(|| {
1016            internal_datafusion_err!("input_schema in AggregateNode is missing.")
1017        })?;
1018        let physical_schema: SchemaRef = SchemaRef::new(input_schema.try_into()?);
1019
1020        let physical_filter_expr = hash_agg
1021            .filter_expr
1022            .iter()
1023            .map(|expr| {
1024                expr.expr
1025                    .as_ref()
1026                    .map(|e| {
1027                        parse_physical_expr(e, ctx, &physical_schema, extension_codec)
1028                    })
1029                    .transpose()
1030            })
1031            .collect::<Result<Vec<_>, _>>()?;
1032
1033        let physical_aggr_expr: Vec<Arc<AggregateFunctionExpr>> = hash_agg
1034            .aggr_expr
1035            .iter()
1036            .zip(hash_agg.aggr_expr_name.iter())
1037            .map(|(expr, name)| {
1038                let expr_type = expr.expr_type.as_ref().ok_or_else(|| {
1039                    proto_error("Unexpected empty aggregate physical expression")
1040                })?;
1041
1042                match expr_type {
1043                    ExprType::AggregateExpr(agg_node) => {
1044                        let input_phy_expr: Vec<Arc<dyn PhysicalExpr>> = agg_node
1045                            .expr
1046                            .iter()
1047                            .map(|e| {
1048                                parse_physical_expr(
1049                                    e,
1050                                    ctx,
1051                                    &physical_schema,
1052                                    extension_codec,
1053                                )
1054                            })
1055                            .collect::<Result<Vec<_>>>()?;
1056                        let order_bys = agg_node
1057                            .ordering_req
1058                            .iter()
1059                            .map(|e| {
1060                                parse_physical_sort_expr(
1061                                    e,
1062                                    ctx,
1063                                    &physical_schema,
1064                                    extension_codec,
1065                                )
1066                            })
1067                            .collect::<Result<_>>()?;
1068                        agg_node
1069                            .aggregate_function
1070                            .as_ref()
1071                            .map(|func| match func {
1072                                AggregateFunction::UserDefinedAggrFunction(udaf_name) => {
1073                                    let agg_udf = match &agg_node.fun_definition {
1074                                        Some(buf) => extension_codec
1075                                            .try_decode_udaf(udaf_name, buf)?,
1076                                        None => ctx.udaf(udaf_name).or_else(|_| {
1077                                            extension_codec
1078                                                .try_decode_udaf(udaf_name, &[])
1079                                        })?,
1080                                    };
1081
1082                                    AggregateExprBuilder::new(agg_udf, input_phy_expr)
1083                                        .schema(Arc::clone(&physical_schema))
1084                                        .alias(name)
1085                                        .human_display(agg_node.human_display.clone())
1086                                        .with_ignore_nulls(agg_node.ignore_nulls)
1087                                        .with_distinct(agg_node.distinct)
1088                                        .order_by(order_bys)
1089                                        .build()
1090                                        .map(Arc::new)
1091                                }
1092                            })
1093                            .transpose()?
1094                            .ok_or_else(|| {
1095                                proto_error(
1096                                    "Invalid AggregateExpr, missing aggregate_function",
1097                                )
1098                            })
1099                    }
1100                    _ => internal_err!("Invalid aggregate expression for AggregateExec"),
1101                }
1102            })
1103            .collect::<Result<Vec<_>, _>>()?;
1104
1105        let limit = hash_agg
1106            .limit
1107            .as_ref()
1108            .map(|lit_value| lit_value.limit as usize);
1109
1110        let agg = AggregateExec::try_new(
1111            agg_mode,
1112            PhysicalGroupBy::new(group_expr, null_expr, groups, has_grouping_set),
1113            physical_aggr_expr,
1114            physical_filter_expr,
1115            input,
1116            physical_schema,
1117        )?;
1118
1119        let agg = agg.with_limit(limit);
1120
1121        Ok(Arc::new(agg))
1122    }
1123
1124    fn try_into_hash_join_physical_plan(
1125        &self,
1126        hashjoin: &protobuf::HashJoinExecNode,
1127        ctx: &TaskContext,
1128
1129        extension_codec: &dyn PhysicalExtensionCodec,
1130    ) -> Result<Arc<dyn ExecutionPlan>> {
1131        let left: Arc<dyn ExecutionPlan> =
1132            into_physical_plan(&hashjoin.left, ctx, extension_codec)?;
1133        let right: Arc<dyn ExecutionPlan> =
1134            into_physical_plan(&hashjoin.right, ctx, extension_codec)?;
1135        let left_schema = left.schema();
1136        let right_schema = right.schema();
1137        let on: Vec<(PhysicalExprRef, PhysicalExprRef)> = hashjoin
1138            .on
1139            .iter()
1140            .map(|col| {
1141                let left = parse_physical_expr(
1142                    &col.left.clone().unwrap(),
1143                    ctx,
1144                    left_schema.as_ref(),
1145                    extension_codec,
1146                )?;
1147                let right = parse_physical_expr(
1148                    &col.right.clone().unwrap(),
1149                    ctx,
1150                    right_schema.as_ref(),
1151                    extension_codec,
1152                )?;
1153                Ok((left, right))
1154            })
1155            .collect::<Result<_>>()?;
1156        let join_type =
1157            protobuf::JoinType::try_from(hashjoin.join_type).map_err(|_| {
1158                proto_error(format!(
1159                    "Received a HashJoinNode message with unknown JoinType {}",
1160                    hashjoin.join_type
1161                ))
1162            })?;
1163        let null_equality = protobuf::NullEquality::try_from(hashjoin.null_equality)
1164            .map_err(|_| {
1165                proto_error(format!(
1166                    "Received a HashJoinNode message with unknown NullEquality {}",
1167                    hashjoin.null_equality
1168                ))
1169            })?;
1170        let filter = hashjoin
1171            .filter
1172            .as_ref()
1173            .map(|f| {
1174                let schema = f
1175                    .schema
1176                    .as_ref()
1177                    .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1178                    .try_into()?;
1179
1180                let expression = parse_physical_expr(
1181                    f.expression.as_ref().ok_or_else(|| {
1182                        proto_error("Unexpected empty filter expression")
1183                    })?,
1184                    ctx, &schema,
1185                    extension_codec,
1186                )?;
1187                let column_indices = f.column_indices
1188                    .iter()
1189                    .map(|i| {
1190                        let side = protobuf::JoinSide::try_from(i.side)
1191                            .map_err(|_| proto_error(format!(
1192                                "Received a HashJoinNode message with JoinSide in Filter {}",
1193                                i.side))
1194                            )?;
1195
1196                        Ok(ColumnIndex {
1197                            index: i.index as usize,
1198                            side: side.into(),
1199                        })
1200                    })
1201                    .collect::<Result<Vec<_>>>()?;
1202
1203                Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1204            })
1205            .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1206
1207        let partition_mode = protobuf::PartitionMode::try_from(hashjoin.partition_mode)
1208            .map_err(|_| {
1209            proto_error(format!(
1210                "Received a HashJoinNode message with unknown PartitionMode {}",
1211                hashjoin.partition_mode
1212            ))
1213        })?;
1214        let partition_mode = match partition_mode {
1215            protobuf::PartitionMode::CollectLeft => PartitionMode::CollectLeft,
1216            protobuf::PartitionMode::Partitioned => PartitionMode::Partitioned,
1217            protobuf::PartitionMode::Auto => PartitionMode::Auto,
1218        };
1219        let projection = if !hashjoin.projection.is_empty() {
1220            Some(
1221                hashjoin
1222                    .projection
1223                    .iter()
1224                    .map(|i| *i as usize)
1225                    .collect::<Vec<_>>(),
1226            )
1227        } else {
1228            None
1229        };
1230        Ok(Arc::new(HashJoinExec::try_new(
1231            left,
1232            right,
1233            on,
1234            filter,
1235            &join_type.into(),
1236            projection,
1237            partition_mode,
1238            null_equality.into(),
1239        )?))
1240    }
1241
1242    fn try_into_symmetric_hash_join_physical_plan(
1243        &self,
1244        sym_join: &protobuf::SymmetricHashJoinExecNode,
1245        ctx: &TaskContext,
1246
1247        extension_codec: &dyn PhysicalExtensionCodec,
1248    ) -> Result<Arc<dyn ExecutionPlan>> {
1249        let left = into_physical_plan(&sym_join.left, ctx, extension_codec)?;
1250        let right = into_physical_plan(&sym_join.right, ctx, extension_codec)?;
1251        let left_schema = left.schema();
1252        let right_schema = right.schema();
1253        let on = sym_join
1254            .on
1255            .iter()
1256            .map(|col| {
1257                let left = parse_physical_expr(
1258                    &col.left.clone().unwrap(),
1259                    ctx,
1260                    left_schema.as_ref(),
1261                    extension_codec,
1262                )?;
1263                let right = parse_physical_expr(
1264                    &col.right.clone().unwrap(),
1265                    ctx,
1266                    right_schema.as_ref(),
1267                    extension_codec,
1268                )?;
1269                Ok((left, right))
1270            })
1271            .collect::<Result<_>>()?;
1272        let join_type =
1273            protobuf::JoinType::try_from(sym_join.join_type).map_err(|_| {
1274                proto_error(format!(
1275                    "Received a SymmetricHashJoin message with unknown JoinType {}",
1276                    sym_join.join_type
1277                ))
1278            })?;
1279        let null_equality = protobuf::NullEquality::try_from(sym_join.null_equality)
1280            .map_err(|_| {
1281                proto_error(format!(
1282                    "Received a SymmetricHashJoin message with unknown NullEquality {}",
1283                    sym_join.null_equality
1284                ))
1285            })?;
1286        let filter = sym_join
1287            .filter
1288            .as_ref()
1289            .map(|f| {
1290                let schema = f
1291                    .schema
1292                    .as_ref()
1293                    .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1294                    .try_into()?;
1295
1296                let expression = parse_physical_expr(
1297                    f.expression.as_ref().ok_or_else(|| {
1298                        proto_error("Unexpected empty filter expression")
1299                    })?,
1300                    ctx, &schema,
1301                    extension_codec,
1302                )?;
1303                let column_indices = f.column_indices
1304                    .iter()
1305                    .map(|i| {
1306                        let side = protobuf::JoinSide::try_from(i.side)
1307                            .map_err(|_| proto_error(format!(
1308                                "Received a HashJoinNode message with JoinSide in Filter {}",
1309                                i.side))
1310                            )?;
1311
1312                        Ok(ColumnIndex {
1313                            index: i.index as usize,
1314                            side: side.into(),
1315                        })
1316                    })
1317                    .collect::<Result<_>>()?;
1318
1319                Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1320            })
1321            .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1322
1323        let left_sort_exprs = parse_physical_sort_exprs(
1324            &sym_join.left_sort_exprs,
1325            ctx,
1326            &left_schema,
1327            extension_codec,
1328        )?;
1329        let left_sort_exprs = LexOrdering::new(left_sort_exprs);
1330
1331        let right_sort_exprs = parse_physical_sort_exprs(
1332            &sym_join.right_sort_exprs,
1333            ctx,
1334            &right_schema,
1335            extension_codec,
1336        )?;
1337        let right_sort_exprs = LexOrdering::new(right_sort_exprs);
1338
1339        let partition_mode = protobuf::StreamPartitionMode::try_from(
1340            sym_join.partition_mode,
1341        )
1342        .map_err(|_| {
1343            proto_error(format!(
1344                "Received a SymmetricHashJoin message with unknown PartitionMode {}",
1345                sym_join.partition_mode
1346            ))
1347        })?;
1348        let partition_mode = match partition_mode {
1349            protobuf::StreamPartitionMode::SinglePartition => {
1350                StreamJoinPartitionMode::SinglePartition
1351            }
1352            protobuf::StreamPartitionMode::PartitionedExec => {
1353                StreamJoinPartitionMode::Partitioned
1354            }
1355        };
1356        SymmetricHashJoinExec::try_new(
1357            left,
1358            right,
1359            on,
1360            filter,
1361            &join_type.into(),
1362            null_equality.into(),
1363            left_sort_exprs,
1364            right_sort_exprs,
1365            partition_mode,
1366        )
1367        .map(|e| Arc::new(e) as _)
1368    }
1369
1370    fn try_into_union_physical_plan(
1371        &self,
1372        union: &protobuf::UnionExecNode,
1373        ctx: &TaskContext,
1374
1375        extension_codec: &dyn PhysicalExtensionCodec,
1376    ) -> Result<Arc<dyn ExecutionPlan>> {
1377        let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1378        for input in &union.inputs {
1379            inputs.push(input.try_into_physical_plan(ctx, extension_codec)?);
1380        }
1381        UnionExec::try_new(inputs)
1382    }
1383
1384    fn try_into_interleave_physical_plan(
1385        &self,
1386        interleave: &protobuf::InterleaveExecNode,
1387        ctx: &TaskContext,
1388
1389        extension_codec: &dyn PhysicalExtensionCodec,
1390    ) -> Result<Arc<dyn ExecutionPlan>> {
1391        let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1392        for input in &interleave.inputs {
1393            inputs.push(input.try_into_physical_plan(ctx, extension_codec)?);
1394        }
1395        Ok(Arc::new(InterleaveExec::try_new(inputs)?))
1396    }
1397
1398    fn try_into_cross_join_physical_plan(
1399        &self,
1400        crossjoin: &protobuf::CrossJoinExecNode,
1401        ctx: &TaskContext,
1402
1403        extension_codec: &dyn PhysicalExtensionCodec,
1404    ) -> Result<Arc<dyn ExecutionPlan>> {
1405        let left: Arc<dyn ExecutionPlan> =
1406            into_physical_plan(&crossjoin.left, ctx, extension_codec)?;
1407        let right: Arc<dyn ExecutionPlan> =
1408            into_physical_plan(&crossjoin.right, ctx, extension_codec)?;
1409        Ok(Arc::new(CrossJoinExec::new(left, right)))
1410    }
1411
1412    fn try_into_empty_physical_plan(
1413        &self,
1414        empty: &protobuf::EmptyExecNode,
1415        _ctx: &TaskContext,
1416
1417        _extension_codec: &dyn PhysicalExtensionCodec,
1418    ) -> Result<Arc<dyn ExecutionPlan>> {
1419        let schema = Arc::new(convert_required!(empty.schema)?);
1420        Ok(Arc::new(EmptyExec::new(schema)))
1421    }
1422
1423    fn try_into_placeholder_row_physical_plan(
1424        &self,
1425        placeholder: &protobuf::PlaceholderRowExecNode,
1426        _ctx: &TaskContext,
1427
1428        _extension_codec: &dyn PhysicalExtensionCodec,
1429    ) -> Result<Arc<dyn ExecutionPlan>> {
1430        let schema = Arc::new(convert_required!(placeholder.schema)?);
1431        Ok(Arc::new(PlaceholderRowExec::new(schema)))
1432    }
1433
1434    fn try_into_sort_physical_plan(
1435        &self,
1436        sort: &protobuf::SortExecNode,
1437        ctx: &TaskContext,
1438
1439        extension_codec: &dyn PhysicalExtensionCodec,
1440    ) -> Result<Arc<dyn ExecutionPlan>> {
1441        let input = into_physical_plan(&sort.input, ctx, extension_codec)?;
1442        let exprs = sort
1443            .expr
1444            .iter()
1445            .map(|expr| {
1446                let expr = expr.expr_type.as_ref().ok_or_else(|| {
1447                    proto_error(format!(
1448                        "physical_plan::from_proto() Unexpected expr {self:?}"
1449                    ))
1450                })?;
1451                if let ExprType::Sort(sort_expr) = expr {
1452                    let expr = sort_expr
1453                        .expr
1454                        .as_ref()
1455                        .ok_or_else(|| {
1456                            proto_error(format!(
1457                                "physical_plan::from_proto() Unexpected sort expr {self:?}"
1458                            ))
1459                        })?
1460                        .as_ref();
1461                    Ok(PhysicalSortExpr {
1462                        expr: parse_physical_expr(expr, ctx, input.schema().as_ref(), extension_codec)?,
1463                        options: SortOptions {
1464                            descending: !sort_expr.asc,
1465                            nulls_first: sort_expr.nulls_first,
1466                        },
1467                    })
1468                } else {
1469                    internal_err!(
1470                        "physical_plan::from_proto() {self:?}"
1471                    )
1472                }
1473            })
1474            .collect::<Result<Vec<_>>>()?;
1475        let Some(ordering) = LexOrdering::new(exprs) else {
1476            return internal_err!("SortExec requires an ordering");
1477        };
1478        let fetch = (sort.fetch >= 0).then_some(sort.fetch as _);
1479        let new_sort = SortExec::new(ordering, input)
1480            .with_fetch(fetch)
1481            .with_preserve_partitioning(sort.preserve_partitioning);
1482
1483        Ok(Arc::new(new_sort))
1484    }
1485
1486    fn try_into_sort_preserving_merge_physical_plan(
1487        &self,
1488        sort: &protobuf::SortPreservingMergeExecNode,
1489        ctx: &TaskContext,
1490
1491        extension_codec: &dyn PhysicalExtensionCodec,
1492    ) -> Result<Arc<dyn ExecutionPlan>> {
1493        let input = into_physical_plan(&sort.input, ctx, extension_codec)?;
1494        let exprs = sort
1495            .expr
1496            .iter()
1497            .map(|expr| {
1498                let expr = expr.expr_type.as_ref().ok_or_else(|| {
1499                    proto_error(format!(
1500                        "physical_plan::from_proto() Unexpected expr {self:?}"
1501                    ))
1502                })?;
1503                if let ExprType::Sort(sort_expr) = expr {
1504                    let expr = sort_expr
1505                        .expr
1506                        .as_ref()
1507                        .ok_or_else(|| {
1508                            proto_error(format!(
1509                            "physical_plan::from_proto() Unexpected sort expr {self:?}"
1510                        ))
1511                        })?
1512                        .as_ref();
1513                    Ok(PhysicalSortExpr {
1514                        expr: parse_physical_expr(
1515                            expr,
1516                            ctx,
1517                            input.schema().as_ref(),
1518                            extension_codec,
1519                        )?,
1520                        options: SortOptions {
1521                            descending: !sort_expr.asc,
1522                            nulls_first: sort_expr.nulls_first,
1523                        },
1524                    })
1525                } else {
1526                    internal_err!("physical_plan::from_proto() {self:?}")
1527                }
1528            })
1529            .collect::<Result<Vec<_>>>()?;
1530        let Some(ordering) = LexOrdering::new(exprs) else {
1531            return internal_err!("SortExec requires an ordering");
1532        };
1533        let fetch = (sort.fetch >= 0).then_some(sort.fetch as _);
1534        Ok(Arc::new(
1535            SortPreservingMergeExec::new(ordering, input).with_fetch(fetch),
1536        ))
1537    }
1538
1539    fn try_into_extension_physical_plan(
1540        &self,
1541        extension: &protobuf::PhysicalExtensionNode,
1542        ctx: &TaskContext,
1543
1544        extension_codec: &dyn PhysicalExtensionCodec,
1545    ) -> Result<Arc<dyn ExecutionPlan>> {
1546        let inputs: Vec<Arc<dyn ExecutionPlan>> = extension
1547            .inputs
1548            .iter()
1549            .map(|i| i.try_into_physical_plan(ctx, extension_codec))
1550            .collect::<Result<_>>()?;
1551
1552        let extension_node =
1553            extension_codec.try_decode(extension.node.as_slice(), &inputs, ctx)?;
1554
1555        Ok(extension_node)
1556    }
1557
1558    fn try_into_nested_loop_join_physical_plan(
1559        &self,
1560        join: &protobuf::NestedLoopJoinExecNode,
1561        ctx: &TaskContext,
1562
1563        extension_codec: &dyn PhysicalExtensionCodec,
1564    ) -> Result<Arc<dyn ExecutionPlan>> {
1565        let left: Arc<dyn ExecutionPlan> =
1566            into_physical_plan(&join.left, ctx, extension_codec)?;
1567        let right: Arc<dyn ExecutionPlan> =
1568            into_physical_plan(&join.right, ctx, extension_codec)?;
1569        let join_type = protobuf::JoinType::try_from(join.join_type).map_err(|_| {
1570            proto_error(format!(
1571                "Received a NestedLoopJoinExecNode message with unknown JoinType {}",
1572                join.join_type
1573            ))
1574        })?;
1575        let filter = join
1576                    .filter
1577                    .as_ref()
1578                    .map(|f| {
1579                        let schema = f
1580                            .schema
1581                            .as_ref()
1582                            .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1583                            .try_into()?;
1584
1585                        let expression = parse_physical_expr(
1586                            f.expression.as_ref().ok_or_else(|| {
1587                                proto_error("Unexpected empty filter expression")
1588                            })?,
1589                            ctx, &schema,
1590                            extension_codec,
1591                        )?;
1592                        let column_indices = f.column_indices
1593                            .iter()
1594                            .map(|i| {
1595                                let side = protobuf::JoinSide::try_from(i.side)
1596                                    .map_err(|_| proto_error(format!(
1597                                        "Received a NestedLoopJoinExecNode message with JoinSide in Filter {}",
1598                                        i.side))
1599                                    )?;
1600
1601                                Ok(ColumnIndex {
1602                                    index: i.index as usize,
1603                                    side: side.into(),
1604                                })
1605                            })
1606                            .collect::<Result<Vec<_>>>()?;
1607
1608                        Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1609                    })
1610                    .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1611
1612        let projection = if !join.projection.is_empty() {
1613            Some(
1614                join.projection
1615                    .iter()
1616                    .map(|i| *i as usize)
1617                    .collect::<Vec<_>>(),
1618            )
1619        } else {
1620            None
1621        };
1622
1623        Ok(Arc::new(NestedLoopJoinExec::try_new(
1624            left,
1625            right,
1626            filter,
1627            &join_type.into(),
1628            projection,
1629        )?))
1630    }
1631
1632    fn try_into_analyze_physical_plan(
1633        &self,
1634        analyze: &protobuf::AnalyzeExecNode,
1635        ctx: &TaskContext,
1636
1637        extension_codec: &dyn PhysicalExtensionCodec,
1638    ) -> Result<Arc<dyn ExecutionPlan>> {
1639        let input: Arc<dyn ExecutionPlan> =
1640            into_physical_plan(&analyze.input, ctx, extension_codec)?;
1641        Ok(Arc::new(AnalyzeExec::new(
1642            analyze.verbose,
1643            analyze.show_statistics,
1644            vec![MetricType::SUMMARY, MetricType::DEV],
1645            input,
1646            Arc::new(convert_required!(analyze.schema)?),
1647        )))
1648    }
1649
1650    fn try_into_json_sink_physical_plan(
1651        &self,
1652        sink: &protobuf::JsonSinkExecNode,
1653        ctx: &TaskContext,
1654
1655        extension_codec: &dyn PhysicalExtensionCodec,
1656    ) -> Result<Arc<dyn ExecutionPlan>> {
1657        let input = into_physical_plan(&sink.input, ctx, extension_codec)?;
1658
1659        let data_sink: JsonSink = sink
1660            .sink
1661            .as_ref()
1662            .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1663            .try_into()?;
1664        let sink_schema = input.schema();
1665        let sort_order = sink
1666            .sort_order
1667            .as_ref()
1668            .map(|collection| {
1669                parse_physical_sort_exprs(
1670                    &collection.physical_sort_expr_nodes,
1671                    ctx,
1672                    &sink_schema,
1673                    extension_codec,
1674                )
1675                .map(|sort_exprs| {
1676                    LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1677                })
1678            })
1679            .transpose()?
1680            .flatten();
1681        Ok(Arc::new(DataSinkExec::new(
1682            input,
1683            Arc::new(data_sink),
1684            sort_order,
1685        )))
1686    }
1687
1688    fn try_into_csv_sink_physical_plan(
1689        &self,
1690        sink: &protobuf::CsvSinkExecNode,
1691        ctx: &TaskContext,
1692
1693        extension_codec: &dyn PhysicalExtensionCodec,
1694    ) -> Result<Arc<dyn ExecutionPlan>> {
1695        let input = into_physical_plan(&sink.input, ctx, extension_codec)?;
1696
1697        let data_sink: CsvSink = sink
1698            .sink
1699            .as_ref()
1700            .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1701            .try_into()?;
1702        let sink_schema = input.schema();
1703        let sort_order = sink
1704            .sort_order
1705            .as_ref()
1706            .map(|collection| {
1707                parse_physical_sort_exprs(
1708                    &collection.physical_sort_expr_nodes,
1709                    ctx,
1710                    &sink_schema,
1711                    extension_codec,
1712                )
1713                .map(|sort_exprs| {
1714                    LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1715                })
1716            })
1717            .transpose()?
1718            .flatten();
1719        Ok(Arc::new(DataSinkExec::new(
1720            input,
1721            Arc::new(data_sink),
1722            sort_order,
1723        )))
1724    }
1725
1726    #[cfg_attr(not(feature = "parquet"), expect(unused_variables))]
1727    fn try_into_parquet_sink_physical_plan(
1728        &self,
1729        sink: &protobuf::ParquetSinkExecNode,
1730        ctx: &TaskContext,
1731
1732        extension_codec: &dyn PhysicalExtensionCodec,
1733    ) -> Result<Arc<dyn ExecutionPlan>> {
1734        #[cfg(feature = "parquet")]
1735        {
1736            let input = into_physical_plan(&sink.input, ctx, extension_codec)?;
1737
1738            let data_sink: ParquetSink = sink
1739                .sink
1740                .as_ref()
1741                .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1742                .try_into()?;
1743            let sink_schema = input.schema();
1744            let sort_order = sink
1745                .sort_order
1746                .as_ref()
1747                .map(|collection| {
1748                    parse_physical_sort_exprs(
1749                        &collection.physical_sort_expr_nodes,
1750                        ctx,
1751                        &sink_schema,
1752                        extension_codec,
1753                    )
1754                    .map(|sort_exprs| {
1755                        LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1756                    })
1757                })
1758                .transpose()?
1759                .flatten();
1760            Ok(Arc::new(DataSinkExec::new(
1761                input,
1762                Arc::new(data_sink),
1763                sort_order,
1764            )))
1765        }
1766        #[cfg(not(feature = "parquet"))]
1767        panic!("Trying to use ParquetSink without `parquet` feature enabled");
1768    }
1769
1770    fn try_into_unnest_physical_plan(
1771        &self,
1772        unnest: &protobuf::UnnestExecNode,
1773        ctx: &TaskContext,
1774
1775        extension_codec: &dyn PhysicalExtensionCodec,
1776    ) -> Result<Arc<dyn ExecutionPlan>> {
1777        let input = into_physical_plan(&unnest.input, ctx, extension_codec)?;
1778
1779        Ok(Arc::new(UnnestExec::new(
1780            input,
1781            unnest
1782                .list_type_columns
1783                .iter()
1784                .map(|c| ListUnnest {
1785                    index_in_input_schema: c.index_in_input_schema as _,
1786                    depth: c.depth as _,
1787                })
1788                .collect(),
1789            unnest.struct_type_columns.iter().map(|c| *c as _).collect(),
1790            Arc::new(convert_required!(unnest.schema)?),
1791            into_required!(unnest.options)?,
1792        )?))
1793    }
1794
1795    fn generate_series_name_to_str(name: protobuf::GenerateSeriesName) -> &'static str {
1796        match name {
1797            protobuf::GenerateSeriesName::GsGenerateSeries => "generate_series",
1798            protobuf::GenerateSeriesName::GsRange => "range",
1799        }
1800    }
1801    fn try_into_sort_join(
1802        &self,
1803        sort_join: &SortMergeJoinExecNode,
1804        ctx: &TaskContext,
1805
1806        extension_codec: &dyn PhysicalExtensionCodec,
1807    ) -> Result<Arc<dyn ExecutionPlan>> {
1808        let left = into_physical_plan(&sort_join.left, ctx, extension_codec)?;
1809        let left_schema = left.schema();
1810        let right = into_physical_plan(&sort_join.right, ctx, extension_codec)?;
1811        let right_schema = right.schema();
1812
1813        let filter = sort_join
1814            .filter
1815            .as_ref()
1816            .map(|f| {
1817                let schema = f
1818                    .schema
1819                    .as_ref()
1820                    .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1821                    .try_into()?;
1822
1823                let expression = parse_physical_expr(
1824                    f.expression.as_ref().ok_or_else(|| {
1825                        proto_error("Unexpected empty filter expression")
1826                    })?,
1827                    ctx,
1828                    &schema,
1829                    extension_codec,
1830                )?;
1831                let column_indices = f
1832                    .column_indices
1833                    .iter()
1834                    .map(|i| {
1835                        let side =
1836                            protobuf::JoinSide::try_from(i.side).map_err(|_| {
1837                                proto_error(format!(
1838                                    "Received a SortMergeJoinExecNode message with JoinSide in Filter {}",
1839                                    i.side
1840                                ))
1841                            })?;
1842
1843                        Ok(ColumnIndex {
1844                            index: i.index as usize,
1845                            side: side.into(),
1846                        })
1847                    })
1848                    .collect::<Result<Vec<_>>>()?;
1849
1850                Ok(JoinFilter::new(
1851                    expression,
1852                    column_indices,
1853                    Arc::new(schema),
1854                ))
1855            })
1856            .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1857
1858        let join_type =
1859            protobuf::JoinType::try_from(sort_join.join_type).map_err(|_| {
1860                proto_error(format!(
1861                    "Received a SortMergeJoinExecNode message with unknown JoinType {}",
1862                    sort_join.join_type
1863                ))
1864            })?;
1865
1866        let null_equality = protobuf::NullEquality::try_from(sort_join.null_equality)
1867            .map_err(|_| {
1868                proto_error(format!(
1869                    "Received a SortMergeJoinExecNode message with unknown NullEquality {}",
1870                    sort_join.null_equality
1871                ))
1872            })?;
1873
1874        let sort_options = sort_join
1875            .sort_options
1876            .iter()
1877            .map(|e| SortOptions {
1878                descending: !e.asc,
1879                nulls_first: e.nulls_first,
1880            })
1881            .collect();
1882        let on = sort_join
1883            .on
1884            .iter()
1885            .map(|col| {
1886                let left = parse_physical_expr(
1887                    &col.left.clone().unwrap(),
1888                    ctx,
1889                    left_schema.as_ref(),
1890                    extension_codec,
1891                )?;
1892                let right = parse_physical_expr(
1893                    &col.right.clone().unwrap(),
1894                    ctx,
1895                    right_schema.as_ref(),
1896                    extension_codec,
1897                )?;
1898                Ok((left, right))
1899            })
1900            .collect::<Result<_>>()?;
1901
1902        Ok(Arc::new(SortMergeJoinExec::try_new(
1903            left,
1904            right,
1905            on,
1906            filter,
1907            join_type.into(),
1908            sort_options,
1909            null_equality.into(),
1910        )?))
1911    }
1912
1913    fn try_into_generate_series_physical_plan(
1914        &self,
1915        generate_series: &protobuf::GenerateSeriesNode,
1916    ) -> Result<Arc<dyn ExecutionPlan>> {
1917        let schema: SchemaRef = Arc::new(convert_required!(generate_series.schema)?);
1918
1919        let args = match &generate_series.args {
1920            Some(protobuf::generate_series_node::Args::ContainsNull(args)) => {
1921                GenSeriesArgs::ContainsNull {
1922                    name: Self::generate_series_name_to_str(args.name()),
1923                }
1924            }
1925            Some(protobuf::generate_series_node::Args::Int64Args(args)) => {
1926                GenSeriesArgs::Int64Args {
1927                    start: args.start,
1928                    end: args.end,
1929                    step: args.step,
1930                    include_end: args.include_end,
1931                    name: Self::generate_series_name_to_str(args.name()),
1932                }
1933            }
1934            Some(protobuf::generate_series_node::Args::TimestampArgs(args)) => {
1935                let step_proto = args.step.as_ref().ok_or_else(|| {
1936                    internal_datafusion_err!("Missing step in TimestampArgs")
1937                })?;
1938                let step = IntervalMonthDayNanoType::make_value(
1939                    step_proto.months,
1940                    step_proto.days,
1941                    step_proto.nanos,
1942                );
1943                GenSeriesArgs::TimestampArgs {
1944                    start: args.start,
1945                    end: args.end,
1946                    step,
1947                    tz: args.tz.as_ref().map(|s| Arc::from(s.as_str())),
1948                    include_end: args.include_end,
1949                    name: Self::generate_series_name_to_str(args.name()),
1950                }
1951            }
1952            Some(protobuf::generate_series_node::Args::DateArgs(args)) => {
1953                let step_proto = args.step.as_ref().ok_or_else(|| {
1954                    internal_datafusion_err!("Missing step in DateArgs")
1955                })?;
1956                let step = IntervalMonthDayNanoType::make_value(
1957                    step_proto.months,
1958                    step_proto.days,
1959                    step_proto.nanos,
1960                );
1961                GenSeriesArgs::DateArgs {
1962                    start: args.start,
1963                    end: args.end,
1964                    step,
1965                    include_end: args.include_end,
1966                    name: Self::generate_series_name_to_str(args.name()),
1967                }
1968            }
1969            None => return internal_err!("Missing args in GenerateSeriesNode"),
1970        };
1971
1972        let table = GenerateSeriesTable::new(Arc::clone(&schema), args);
1973        let generator = table.as_generator(generate_series.target_batch_size as usize)?;
1974
1975        Ok(Arc::new(LazyMemoryExec::try_new(schema, vec![generator])?))
1976    }
1977
1978    fn try_into_cooperative_physical_plan(
1979        &self,
1980        field_stream: &protobuf::CooperativeExecNode,
1981        ctx: &TaskContext,
1982
1983        extension_codec: &dyn PhysicalExtensionCodec,
1984    ) -> Result<Arc<dyn ExecutionPlan>> {
1985        let input = into_physical_plan(&field_stream.input, ctx, extension_codec)?;
1986        Ok(Arc::new(CooperativeExec::new(input)))
1987    }
1988
1989    fn try_into_async_func_physical_plan(
1990        &self,
1991        async_func: &protobuf::AsyncFuncExecNode,
1992        ctx: &TaskContext,
1993        extension_codec: &dyn PhysicalExtensionCodec,
1994    ) -> Result<Arc<dyn ExecutionPlan>> {
1995        let input: Arc<dyn ExecutionPlan> =
1996            into_physical_plan(&async_func.input, ctx, extension_codec)?;
1997
1998        if async_func.async_exprs.len() != async_func.async_expr_names.len() {
1999            return internal_err!(
2000                "AsyncFuncExecNode async_exprs length does not match async_expr_names"
2001            );
2002        }
2003
2004        let async_exprs = async_func
2005            .async_exprs
2006            .iter()
2007            .zip(async_func.async_expr_names.iter())
2008            .map(|(expr, name)| {
2009                let physical_expr = parse_physical_expr(
2010                    expr,
2011                    ctx,
2012                    input.schema().as_ref(),
2013                    extension_codec,
2014                )?;
2015
2016                Ok(Arc::new(AsyncFuncExpr::try_new(
2017                    name.clone(),
2018                    physical_expr,
2019                    input.schema().as_ref(),
2020                )?))
2021            })
2022            .collect::<Result<Vec<_>>>()?;
2023
2024        Ok(Arc::new(AsyncFuncExec::try_new(async_exprs, input)?))
2025    }
2026
2027    fn try_from_explain_exec(
2028        exec: &ExplainExec,
2029        _extension_codec: &dyn PhysicalExtensionCodec,
2030    ) -> Result<Self> {
2031        Ok(protobuf::PhysicalPlanNode {
2032            physical_plan_type: Some(PhysicalPlanType::Explain(
2033                protobuf::ExplainExecNode {
2034                    schema: Some(exec.schema().as_ref().try_into()?),
2035                    stringified_plans: exec
2036                        .stringified_plans()
2037                        .iter()
2038                        .map(|plan| plan.into())
2039                        .collect(),
2040                    verbose: exec.verbose(),
2041                },
2042            )),
2043        })
2044    }
2045
2046    fn try_from_projection_exec(
2047        exec: &ProjectionExec,
2048        extension_codec: &dyn PhysicalExtensionCodec,
2049    ) -> Result<Self> {
2050        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2051            exec.input().to_owned(),
2052            extension_codec,
2053        )?;
2054        let expr = exec
2055            .expr()
2056            .iter()
2057            .map(|proj_expr| serialize_physical_expr(&proj_expr.expr, extension_codec))
2058            .collect::<Result<Vec<_>>>()?;
2059        let expr_name = exec
2060            .expr()
2061            .iter()
2062            .map(|proj_expr| proj_expr.alias.clone())
2063            .collect();
2064        Ok(protobuf::PhysicalPlanNode {
2065            physical_plan_type: Some(PhysicalPlanType::Projection(Box::new(
2066                protobuf::ProjectionExecNode {
2067                    input: Some(Box::new(input)),
2068                    expr,
2069                    expr_name,
2070                },
2071            ))),
2072        })
2073    }
2074
2075    fn try_from_analyze_exec(
2076        exec: &AnalyzeExec,
2077        extension_codec: &dyn PhysicalExtensionCodec,
2078    ) -> Result<Self> {
2079        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2080            exec.input().to_owned(),
2081            extension_codec,
2082        )?;
2083        Ok(protobuf::PhysicalPlanNode {
2084            physical_plan_type: Some(PhysicalPlanType::Analyze(Box::new(
2085                protobuf::AnalyzeExecNode {
2086                    verbose: exec.verbose(),
2087                    show_statistics: exec.show_statistics(),
2088                    input: Some(Box::new(input)),
2089                    schema: Some(exec.schema().as_ref().try_into()?),
2090                },
2091            ))),
2092        })
2093    }
2094
2095    fn try_from_filter_exec(
2096        exec: &FilterExec,
2097        extension_codec: &dyn PhysicalExtensionCodec,
2098    ) -> Result<Self> {
2099        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2100            exec.input().to_owned(),
2101            extension_codec,
2102        )?;
2103        Ok(protobuf::PhysicalPlanNode {
2104            physical_plan_type: Some(PhysicalPlanType::Filter(Box::new(
2105                protobuf::FilterExecNode {
2106                    input: Some(Box::new(input)),
2107                    expr: Some(serialize_physical_expr(
2108                        exec.predicate(),
2109                        extension_codec,
2110                    )?),
2111                    default_filter_selectivity: exec.default_selectivity() as u32,
2112                    projection: exec.projection().as_ref().map_or_else(Vec::new, |v| {
2113                        v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2114                    }),
2115                },
2116            ))),
2117        })
2118    }
2119
2120    fn try_from_global_limit_exec(
2121        limit: &GlobalLimitExec,
2122        extension_codec: &dyn PhysicalExtensionCodec,
2123    ) -> Result<Self> {
2124        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2125            limit.input().to_owned(),
2126            extension_codec,
2127        )?;
2128
2129        Ok(protobuf::PhysicalPlanNode {
2130            physical_plan_type: Some(PhysicalPlanType::GlobalLimit(Box::new(
2131                protobuf::GlobalLimitExecNode {
2132                    input: Some(Box::new(input)),
2133                    skip: limit.skip() as u32,
2134                    fetch: match limit.fetch() {
2135                        Some(n) => n as i64,
2136                        _ => -1, // no limit
2137                    },
2138                },
2139            ))),
2140        })
2141    }
2142
2143    fn try_from_local_limit_exec(
2144        limit: &LocalLimitExec,
2145        extension_codec: &dyn PhysicalExtensionCodec,
2146    ) -> Result<Self> {
2147        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2148            limit.input().to_owned(),
2149            extension_codec,
2150        )?;
2151        Ok(protobuf::PhysicalPlanNode {
2152            physical_plan_type: Some(PhysicalPlanType::LocalLimit(Box::new(
2153                protobuf::LocalLimitExecNode {
2154                    input: Some(Box::new(input)),
2155                    fetch: limit.fetch() as u32,
2156                },
2157            ))),
2158        })
2159    }
2160
2161    fn try_from_hash_join_exec(
2162        exec: &HashJoinExec,
2163        extension_codec: &dyn PhysicalExtensionCodec,
2164    ) -> Result<Self> {
2165        let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2166            exec.left().to_owned(),
2167            extension_codec,
2168        )?;
2169        let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2170            exec.right().to_owned(),
2171            extension_codec,
2172        )?;
2173        let on: Vec<protobuf::JoinOn> = exec
2174            .on()
2175            .iter()
2176            .map(|tuple| {
2177                let l = serialize_physical_expr(&tuple.0, extension_codec)?;
2178                let r = serialize_physical_expr(&tuple.1, extension_codec)?;
2179                Ok::<_, DataFusionError>(protobuf::JoinOn {
2180                    left: Some(l),
2181                    right: Some(r),
2182                })
2183            })
2184            .collect::<Result<_>>()?;
2185        let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2186        let null_equality: protobuf::NullEquality = exec.null_equality().into();
2187        let filter = exec
2188            .filter()
2189            .as_ref()
2190            .map(|f| {
2191                let expression =
2192                    serialize_physical_expr(f.expression(), extension_codec)?;
2193                let column_indices = f
2194                    .column_indices()
2195                    .iter()
2196                    .map(|i| {
2197                        let side: protobuf::JoinSide = i.side.to_owned().into();
2198                        protobuf::ColumnIndex {
2199                            index: i.index as u32,
2200                            side: side.into(),
2201                        }
2202                    })
2203                    .collect();
2204                let schema = f.schema().as_ref().try_into()?;
2205                Ok(protobuf::JoinFilter {
2206                    expression: Some(expression),
2207                    column_indices,
2208                    schema: Some(schema),
2209                })
2210            })
2211            .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2212
2213        let partition_mode = match exec.partition_mode() {
2214            PartitionMode::CollectLeft => protobuf::PartitionMode::CollectLeft,
2215            PartitionMode::Partitioned => protobuf::PartitionMode::Partitioned,
2216            PartitionMode::Auto => protobuf::PartitionMode::Auto,
2217        };
2218
2219        Ok(protobuf::PhysicalPlanNode {
2220            physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new(
2221                protobuf::HashJoinExecNode {
2222                    left: Some(Box::new(left)),
2223                    right: Some(Box::new(right)),
2224                    on,
2225                    join_type: join_type.into(),
2226                    partition_mode: partition_mode.into(),
2227                    null_equality: null_equality.into(),
2228                    filter,
2229                    projection: exec.projection.as_ref().map_or_else(Vec::new, |v| {
2230                        v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2231                    }),
2232                },
2233            ))),
2234        })
2235    }
2236
2237    fn try_from_symmetric_hash_join_exec(
2238        exec: &SymmetricHashJoinExec,
2239        extension_codec: &dyn PhysicalExtensionCodec,
2240    ) -> Result<Self> {
2241        let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2242            exec.left().to_owned(),
2243            extension_codec,
2244        )?;
2245        let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2246            exec.right().to_owned(),
2247            extension_codec,
2248        )?;
2249        let on = exec
2250            .on()
2251            .iter()
2252            .map(|tuple| {
2253                let l = serialize_physical_expr(&tuple.0, extension_codec)?;
2254                let r = serialize_physical_expr(&tuple.1, extension_codec)?;
2255                Ok::<_, DataFusionError>(protobuf::JoinOn {
2256                    left: Some(l),
2257                    right: Some(r),
2258                })
2259            })
2260            .collect::<Result<_>>()?;
2261        let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2262        let null_equality: protobuf::NullEquality = exec.null_equality().into();
2263        let filter = exec
2264            .filter()
2265            .as_ref()
2266            .map(|f| {
2267                let expression =
2268                    serialize_physical_expr(f.expression(), extension_codec)?;
2269                let column_indices = f
2270                    .column_indices()
2271                    .iter()
2272                    .map(|i| {
2273                        let side: protobuf::JoinSide = i.side.to_owned().into();
2274                        protobuf::ColumnIndex {
2275                            index: i.index as u32,
2276                            side: side.into(),
2277                        }
2278                    })
2279                    .collect();
2280                let schema = f.schema().as_ref().try_into()?;
2281                Ok(protobuf::JoinFilter {
2282                    expression: Some(expression),
2283                    column_indices,
2284                    schema: Some(schema),
2285                })
2286            })
2287            .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2288
2289        let partition_mode = match exec.partition_mode() {
2290            StreamJoinPartitionMode::SinglePartition => {
2291                protobuf::StreamPartitionMode::SinglePartition
2292            }
2293            StreamJoinPartitionMode::Partitioned => {
2294                protobuf::StreamPartitionMode::PartitionedExec
2295            }
2296        };
2297
2298        let left_sort_exprs = exec
2299            .left_sort_exprs()
2300            .map(|exprs| {
2301                exprs
2302                    .iter()
2303                    .map(|expr| {
2304                        Ok(protobuf::PhysicalSortExprNode {
2305                            expr: Some(Box::new(serialize_physical_expr(
2306                                &expr.expr,
2307                                extension_codec,
2308                            )?)),
2309                            asc: !expr.options.descending,
2310                            nulls_first: expr.options.nulls_first,
2311                        })
2312                    })
2313                    .collect::<Result<Vec<_>>>()
2314            })
2315            .transpose()?
2316            .unwrap_or(vec![]);
2317
2318        let right_sort_exprs = exec
2319            .right_sort_exprs()
2320            .map(|exprs| {
2321                exprs
2322                    .iter()
2323                    .map(|expr| {
2324                        Ok(protobuf::PhysicalSortExprNode {
2325                            expr: Some(Box::new(serialize_physical_expr(
2326                                &expr.expr,
2327                                extension_codec,
2328                            )?)),
2329                            asc: !expr.options.descending,
2330                            nulls_first: expr.options.nulls_first,
2331                        })
2332                    })
2333                    .collect::<Result<Vec<_>>>()
2334            })
2335            .transpose()?
2336            .unwrap_or(vec![]);
2337
2338        Ok(protobuf::PhysicalPlanNode {
2339            physical_plan_type: Some(PhysicalPlanType::SymmetricHashJoin(Box::new(
2340                protobuf::SymmetricHashJoinExecNode {
2341                    left: Some(Box::new(left)),
2342                    right: Some(Box::new(right)),
2343                    on,
2344                    join_type: join_type.into(),
2345                    partition_mode: partition_mode.into(),
2346                    null_equality: null_equality.into(),
2347                    left_sort_exprs,
2348                    right_sort_exprs,
2349                    filter,
2350                },
2351            ))),
2352        })
2353    }
2354
2355    fn try_from_sort_merge_join_exec(
2356        exec: &SortMergeJoinExec,
2357        extension_codec: &dyn PhysicalExtensionCodec,
2358    ) -> Result<Self> {
2359        let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2360            exec.left().to_owned(),
2361            extension_codec,
2362        )?;
2363        let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2364            exec.right().to_owned(),
2365            extension_codec,
2366        )?;
2367        let on = exec
2368            .on()
2369            .iter()
2370            .map(|tuple| {
2371                let l = serialize_physical_expr(&tuple.0, extension_codec)?;
2372                let r = serialize_physical_expr(&tuple.1, extension_codec)?;
2373                Ok::<_, DataFusionError>(protobuf::JoinOn {
2374                    left: Some(l),
2375                    right: Some(r),
2376                })
2377            })
2378            .collect::<Result<_>>()?;
2379        let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2380        let null_equality: protobuf::NullEquality = exec.null_equality().into();
2381        let filter = exec
2382            .filter()
2383            .as_ref()
2384            .map(|f| {
2385                let expression =
2386                    serialize_physical_expr(f.expression(), extension_codec)?;
2387                let column_indices = f
2388                    .column_indices()
2389                    .iter()
2390                    .map(|i| {
2391                        let side: protobuf::JoinSide = i.side.to_owned().into();
2392                        protobuf::ColumnIndex {
2393                            index: i.index as u32,
2394                            side: side.into(),
2395                        }
2396                    })
2397                    .collect();
2398                let schema = f.schema().as_ref().try_into()?;
2399                Ok(protobuf::JoinFilter {
2400                    expression: Some(expression),
2401                    column_indices,
2402                    schema: Some(schema),
2403                })
2404            })
2405            .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2406
2407        let sort_options = exec
2408            .sort_options()
2409            .iter()
2410            .map(
2411                |SortOptions {
2412                     descending,
2413                     nulls_first,
2414                 }| {
2415                    SortExprNode {
2416                        expr: None,
2417                        asc: !*descending,
2418                        nulls_first: *nulls_first,
2419                    }
2420                },
2421            )
2422            .collect();
2423
2424        Ok(protobuf::PhysicalPlanNode {
2425            physical_plan_type: Some(PhysicalPlanType::SortMergeJoin(Box::new(
2426                protobuf::SortMergeJoinExecNode {
2427                    left: Some(Box::new(left)),
2428                    right: Some(Box::new(right)),
2429                    on,
2430                    join_type: join_type.into(),
2431                    null_equality: null_equality.into(),
2432                    filter,
2433                    sort_options,
2434                },
2435            ))),
2436        })
2437    }
2438
2439    fn try_from_cross_join_exec(
2440        exec: &CrossJoinExec,
2441        extension_codec: &dyn PhysicalExtensionCodec,
2442    ) -> Result<Self> {
2443        let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2444            exec.left().to_owned(),
2445            extension_codec,
2446        )?;
2447        let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2448            exec.right().to_owned(),
2449            extension_codec,
2450        )?;
2451        Ok(protobuf::PhysicalPlanNode {
2452            physical_plan_type: Some(PhysicalPlanType::CrossJoin(Box::new(
2453                protobuf::CrossJoinExecNode {
2454                    left: Some(Box::new(left)),
2455                    right: Some(Box::new(right)),
2456                },
2457            ))),
2458        })
2459    }
2460
2461    fn try_from_aggregate_exec(
2462        exec: &AggregateExec,
2463        extension_codec: &dyn PhysicalExtensionCodec,
2464    ) -> Result<Self> {
2465        let groups: Vec<bool> = exec
2466            .group_expr()
2467            .groups()
2468            .iter()
2469            .flatten()
2470            .copied()
2471            .collect();
2472
2473        let group_names = exec
2474            .group_expr()
2475            .expr()
2476            .iter()
2477            .map(|expr| expr.1.to_owned())
2478            .collect();
2479
2480        let filter = exec
2481            .filter_expr()
2482            .iter()
2483            .map(|expr| serialize_maybe_filter(expr.to_owned(), extension_codec))
2484            .collect::<Result<Vec<_>>>()?;
2485
2486        let agg = exec
2487            .aggr_expr()
2488            .iter()
2489            .map(|expr| serialize_physical_aggr_expr(expr.to_owned(), extension_codec))
2490            .collect::<Result<Vec<_>>>()?;
2491
2492        let agg_names = exec
2493            .aggr_expr()
2494            .iter()
2495            .map(|expr| expr.name().to_string())
2496            .collect::<Vec<_>>();
2497
2498        let agg_mode = match exec.mode() {
2499            AggregateMode::Partial => protobuf::AggregateMode::Partial,
2500            AggregateMode::Final => protobuf::AggregateMode::Final,
2501            AggregateMode::FinalPartitioned => protobuf::AggregateMode::FinalPartitioned,
2502            AggregateMode::Single => protobuf::AggregateMode::Single,
2503            AggregateMode::SinglePartitioned => {
2504                protobuf::AggregateMode::SinglePartitioned
2505            }
2506        };
2507        let input_schema = exec.input_schema();
2508        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2509            exec.input().to_owned(),
2510            extension_codec,
2511        )?;
2512
2513        let null_expr = exec
2514            .group_expr()
2515            .null_expr()
2516            .iter()
2517            .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
2518            .collect::<Result<Vec<_>>>()?;
2519
2520        let group_expr = exec
2521            .group_expr()
2522            .expr()
2523            .iter()
2524            .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
2525            .collect::<Result<Vec<_>>>()?;
2526
2527        let limit = exec.limit().map(|value| protobuf::AggLimit {
2528            limit: value as u64,
2529        });
2530
2531        Ok(protobuf::PhysicalPlanNode {
2532            physical_plan_type: Some(PhysicalPlanType::Aggregate(Box::new(
2533                protobuf::AggregateExecNode {
2534                    group_expr,
2535                    group_expr_name: group_names,
2536                    aggr_expr: agg,
2537                    filter_expr: filter,
2538                    aggr_expr_name: agg_names,
2539                    mode: agg_mode as i32,
2540                    input: Some(Box::new(input)),
2541                    input_schema: Some(input_schema.as_ref().try_into()?),
2542                    null_expr,
2543                    groups,
2544                    limit,
2545                    has_grouping_set: exec.group_expr().has_grouping_set(),
2546                },
2547            ))),
2548        })
2549    }
2550
2551    fn try_from_empty_exec(
2552        empty: &EmptyExec,
2553        _extension_codec: &dyn PhysicalExtensionCodec,
2554    ) -> Result<Self> {
2555        let schema = empty.schema().as_ref().try_into()?;
2556        Ok(protobuf::PhysicalPlanNode {
2557            physical_plan_type: Some(PhysicalPlanType::Empty(protobuf::EmptyExecNode {
2558                schema: Some(schema),
2559            })),
2560        })
2561    }
2562
2563    fn try_from_placeholder_row_exec(
2564        empty: &PlaceholderRowExec,
2565        _extension_codec: &dyn PhysicalExtensionCodec,
2566    ) -> Result<Self> {
2567        let schema = empty.schema().as_ref().try_into()?;
2568        Ok(protobuf::PhysicalPlanNode {
2569            physical_plan_type: Some(PhysicalPlanType::PlaceholderRow(
2570                protobuf::PlaceholderRowExecNode {
2571                    schema: Some(schema),
2572                },
2573            )),
2574        })
2575    }
2576
2577    fn try_from_coalesce_batches_exec(
2578        coalesce_batches: &CoalesceBatchesExec,
2579        extension_codec: &dyn PhysicalExtensionCodec,
2580    ) -> Result<Self> {
2581        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2582            coalesce_batches.input().to_owned(),
2583            extension_codec,
2584        )?;
2585        Ok(protobuf::PhysicalPlanNode {
2586            physical_plan_type: Some(PhysicalPlanType::CoalesceBatches(Box::new(
2587                protobuf::CoalesceBatchesExecNode {
2588                    input: Some(Box::new(input)),
2589                    target_batch_size: coalesce_batches.target_batch_size() as u32,
2590                    fetch: coalesce_batches.fetch().map(|n| n as u32),
2591                },
2592            ))),
2593        })
2594    }
2595
2596    fn try_from_data_source_exec(
2597        data_source_exec: &DataSourceExec,
2598        extension_codec: &dyn PhysicalExtensionCodec,
2599    ) -> Result<Option<Self>> {
2600        let data_source = data_source_exec.data_source();
2601        if let Some(maybe_csv) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2602            let source = maybe_csv.file_source();
2603            if let Some(csv_config) = source.as_any().downcast_ref::<CsvSource>() {
2604                return Ok(Some(protobuf::PhysicalPlanNode {
2605                    physical_plan_type: Some(PhysicalPlanType::CsvScan(
2606                        protobuf::CsvScanExecNode {
2607                            base_conf: Some(serialize_file_scan_config(
2608                                maybe_csv,
2609                                extension_codec,
2610                            )?),
2611                            has_header: csv_config.has_header(),
2612                            delimiter: byte_to_string(
2613                                csv_config.delimiter(),
2614                                "delimiter",
2615                            )?,
2616                            quote: byte_to_string(csv_config.quote(), "quote")?,
2617                            optional_escape: if let Some(escape) = csv_config.escape() {
2618                                Some(
2619                                    protobuf::csv_scan_exec_node::OptionalEscape::Escape(
2620                                        byte_to_string(escape, "escape")?,
2621                                    ),
2622                                )
2623                            } else {
2624                                None
2625                            },
2626                            optional_comment: if let Some(comment) = csv_config.comment()
2627                            {
2628                                Some(protobuf::csv_scan_exec_node::OptionalComment::Comment(
2629                                        byte_to_string(comment, "comment")?,
2630                                    ))
2631                            } else {
2632                                None
2633                            },
2634                            newlines_in_values: csv_config.newlines_in_values(),
2635                            truncate_rows: csv_config.truncate_rows(),
2636                        },
2637                    )),
2638                }));
2639            }
2640        }
2641
2642        if let Some(scan_conf) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2643            let source = scan_conf.file_source();
2644            if let Some(_json_source) = source.as_any().downcast_ref::<JsonSource>() {
2645                return Ok(Some(protobuf::PhysicalPlanNode {
2646                    physical_plan_type: Some(PhysicalPlanType::JsonScan(
2647                        protobuf::JsonScanExecNode {
2648                            base_conf: Some(serialize_file_scan_config(
2649                                scan_conf,
2650                                extension_codec,
2651                            )?),
2652                        },
2653                    )),
2654                }));
2655            }
2656        }
2657
2658        #[cfg(feature = "parquet")]
2659        if let Some((maybe_parquet, conf)) =
2660            data_source_exec.downcast_to_file_source::<ParquetSource>()
2661        {
2662            let predicate = conf
2663                .filter()
2664                .map(|pred| serialize_physical_expr(&pred, extension_codec))
2665                .transpose()?;
2666            return Ok(Some(protobuf::PhysicalPlanNode {
2667                physical_plan_type: Some(PhysicalPlanType::ParquetScan(
2668                    protobuf::ParquetScanExecNode {
2669                        base_conf: Some(serialize_file_scan_config(
2670                            maybe_parquet,
2671                            extension_codec,
2672                        )?),
2673                        predicate,
2674                        parquet_options: Some(conf.table_parquet_options().try_into()?),
2675                    },
2676                )),
2677            }));
2678        }
2679
2680        #[cfg(feature = "avro")]
2681        if let Some(maybe_avro) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2682            let source = maybe_avro.file_source();
2683            if source.as_any().downcast_ref::<AvroSource>().is_some() {
2684                return Ok(Some(protobuf::PhysicalPlanNode {
2685                    physical_plan_type: Some(PhysicalPlanType::AvroScan(
2686                        protobuf::AvroScanExecNode {
2687                            base_conf: Some(serialize_file_scan_config(
2688                                maybe_avro,
2689                                extension_codec,
2690                            )?),
2691                        },
2692                    )),
2693                }));
2694            }
2695        }
2696
2697        if let Some(source_conf) =
2698            data_source.as_any().downcast_ref::<MemorySourceConfig>()
2699        {
2700            let proto_partitions = source_conf
2701                .partitions()
2702                .iter()
2703                .map(|p| serialize_record_batches(p))
2704                .collect::<Result<Vec<_>>>()?;
2705
2706            let proto_schema: protobuf::Schema =
2707                source_conf.original_schema().as_ref().try_into()?;
2708
2709            let proto_projection = source_conf
2710                .projection()
2711                .as_ref()
2712                .map_or_else(Vec::new, |v| {
2713                    v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2714                });
2715
2716            let proto_sort_information = source_conf
2717                .sort_information()
2718                .iter()
2719                .map(|ordering| {
2720                    let sort_exprs = serialize_physical_sort_exprs(
2721                        ordering.to_owned(),
2722                        extension_codec,
2723                    )?;
2724                    Ok::<_, DataFusionError>(protobuf::PhysicalSortExprNodeCollection {
2725                        physical_sort_expr_nodes: sort_exprs,
2726                    })
2727                })
2728                .collect::<Result<Vec<_>, _>>()?;
2729
2730            return Ok(Some(protobuf::PhysicalPlanNode {
2731                physical_plan_type: Some(PhysicalPlanType::MemoryScan(
2732                    protobuf::MemoryScanExecNode {
2733                        partitions: proto_partitions,
2734                        schema: Some(proto_schema),
2735                        projection: proto_projection,
2736                        sort_information: proto_sort_information,
2737                        show_sizes: source_conf.show_sizes(),
2738                        fetch: source_conf.fetch().map(|f| f as u32),
2739                    },
2740                )),
2741            }));
2742        }
2743
2744        Ok(None)
2745    }
2746
2747    fn try_from_coalesce_partitions_exec(
2748        exec: &CoalescePartitionsExec,
2749        extension_codec: &dyn PhysicalExtensionCodec,
2750    ) -> Result<Self> {
2751        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2752            exec.input().to_owned(),
2753            extension_codec,
2754        )?;
2755        Ok(protobuf::PhysicalPlanNode {
2756            physical_plan_type: Some(PhysicalPlanType::Merge(Box::new(
2757                protobuf::CoalescePartitionsExecNode {
2758                    input: Some(Box::new(input)),
2759                    fetch: exec.fetch().map(|f| f as u32),
2760                },
2761            ))),
2762        })
2763    }
2764
2765    fn try_from_repartition_exec(
2766        exec: &RepartitionExec,
2767        extension_codec: &dyn PhysicalExtensionCodec,
2768    ) -> Result<Self> {
2769        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2770            exec.input().to_owned(),
2771            extension_codec,
2772        )?;
2773
2774        let pb_partitioning =
2775            serialize_partitioning(exec.partitioning(), extension_codec)?;
2776
2777        Ok(protobuf::PhysicalPlanNode {
2778            physical_plan_type: Some(PhysicalPlanType::Repartition(Box::new(
2779                protobuf::RepartitionExecNode {
2780                    input: Some(Box::new(input)),
2781                    partitioning: Some(pb_partitioning),
2782                },
2783            ))),
2784        })
2785    }
2786
2787    fn try_from_sort_exec(
2788        exec: &SortExec,
2789        extension_codec: &dyn PhysicalExtensionCodec,
2790    ) -> Result<Self> {
2791        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2792            exec.input().to_owned(),
2793            extension_codec,
2794        )?;
2795        let expr = exec
2796            .expr()
2797            .iter()
2798            .map(|expr| {
2799                let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
2800                    expr: Some(Box::new(serialize_physical_expr(
2801                        &expr.expr,
2802                        extension_codec,
2803                    )?)),
2804                    asc: !expr.options.descending,
2805                    nulls_first: expr.options.nulls_first,
2806                });
2807                Ok(protobuf::PhysicalExprNode {
2808                    expr_type: Some(ExprType::Sort(sort_expr)),
2809                })
2810            })
2811            .collect::<Result<Vec<_>>>()?;
2812        Ok(protobuf::PhysicalPlanNode {
2813            physical_plan_type: Some(PhysicalPlanType::Sort(Box::new(
2814                protobuf::SortExecNode {
2815                    input: Some(Box::new(input)),
2816                    expr,
2817                    fetch: match exec.fetch() {
2818                        Some(n) => n as i64,
2819                        _ => -1,
2820                    },
2821                    preserve_partitioning: exec.preserve_partitioning(),
2822                },
2823            ))),
2824        })
2825    }
2826
2827    fn try_from_union_exec(
2828        union: &UnionExec,
2829        extension_codec: &dyn PhysicalExtensionCodec,
2830    ) -> Result<Self> {
2831        let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
2832        for input in union.inputs() {
2833            inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
2834                input.to_owned(),
2835                extension_codec,
2836            )?);
2837        }
2838        Ok(protobuf::PhysicalPlanNode {
2839            physical_plan_type: Some(PhysicalPlanType::Union(protobuf::UnionExecNode {
2840                inputs,
2841            })),
2842        })
2843    }
2844
2845    fn try_from_interleave_exec(
2846        interleave: &InterleaveExec,
2847        extension_codec: &dyn PhysicalExtensionCodec,
2848    ) -> Result<Self> {
2849        let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
2850        for input in interleave.inputs() {
2851            inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
2852                input.to_owned(),
2853                extension_codec,
2854            )?);
2855        }
2856        Ok(protobuf::PhysicalPlanNode {
2857            physical_plan_type: Some(PhysicalPlanType::Interleave(
2858                protobuf::InterleaveExecNode { inputs },
2859            )),
2860        })
2861    }
2862
2863    fn try_from_sort_preserving_merge_exec(
2864        exec: &SortPreservingMergeExec,
2865        extension_codec: &dyn PhysicalExtensionCodec,
2866    ) -> Result<Self> {
2867        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2868            exec.input().to_owned(),
2869            extension_codec,
2870        )?;
2871        let expr = exec
2872            .expr()
2873            .iter()
2874            .map(|expr| {
2875                let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
2876                    expr: Some(Box::new(serialize_physical_expr(
2877                        &expr.expr,
2878                        extension_codec,
2879                    )?)),
2880                    asc: !expr.options.descending,
2881                    nulls_first: expr.options.nulls_first,
2882                });
2883                Ok(protobuf::PhysicalExprNode {
2884                    expr_type: Some(ExprType::Sort(sort_expr)),
2885                })
2886            })
2887            .collect::<Result<Vec<_>>>()?;
2888        Ok(protobuf::PhysicalPlanNode {
2889            physical_plan_type: Some(PhysicalPlanType::SortPreservingMerge(Box::new(
2890                protobuf::SortPreservingMergeExecNode {
2891                    input: Some(Box::new(input)),
2892                    expr,
2893                    fetch: exec.fetch().map(|f| f as i64).unwrap_or(-1),
2894                },
2895            ))),
2896        })
2897    }
2898
2899    fn try_from_nested_loop_join_exec(
2900        exec: &NestedLoopJoinExec,
2901        extension_codec: &dyn PhysicalExtensionCodec,
2902    ) -> Result<Self> {
2903        let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2904            exec.left().to_owned(),
2905            extension_codec,
2906        )?;
2907        let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2908            exec.right().to_owned(),
2909            extension_codec,
2910        )?;
2911
2912        let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2913        let filter = exec
2914            .filter()
2915            .as_ref()
2916            .map(|f| {
2917                let expression =
2918                    serialize_physical_expr(f.expression(), extension_codec)?;
2919                let column_indices = f
2920                    .column_indices()
2921                    .iter()
2922                    .map(|i| {
2923                        let side: protobuf::JoinSide = i.side.to_owned().into();
2924                        protobuf::ColumnIndex {
2925                            index: i.index as u32,
2926                            side: side.into(),
2927                        }
2928                    })
2929                    .collect();
2930                let schema = f.schema().as_ref().try_into()?;
2931                Ok(protobuf::JoinFilter {
2932                    expression: Some(expression),
2933                    column_indices,
2934                    schema: Some(schema),
2935                })
2936            })
2937            .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2938
2939        Ok(protobuf::PhysicalPlanNode {
2940            physical_plan_type: Some(PhysicalPlanType::NestedLoopJoin(Box::new(
2941                protobuf::NestedLoopJoinExecNode {
2942                    left: Some(Box::new(left)),
2943                    right: Some(Box::new(right)),
2944                    join_type: join_type.into(),
2945                    filter,
2946                    projection: exec.projection().map_or_else(Vec::new, |v| {
2947                        v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2948                    }),
2949                },
2950            ))),
2951        })
2952    }
2953
2954    fn try_from_window_agg_exec(
2955        exec: &WindowAggExec,
2956        extension_codec: &dyn PhysicalExtensionCodec,
2957    ) -> Result<Self> {
2958        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2959            exec.input().to_owned(),
2960            extension_codec,
2961        )?;
2962
2963        let window_expr = exec
2964            .window_expr()
2965            .iter()
2966            .map(|e| serialize_physical_window_expr(e, extension_codec))
2967            .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
2968
2969        let partition_keys = exec
2970            .partition_keys()
2971            .iter()
2972            .map(|e| serialize_physical_expr(e, extension_codec))
2973            .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
2974
2975        Ok(protobuf::PhysicalPlanNode {
2976            physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
2977                protobuf::WindowAggExecNode {
2978                    input: Some(Box::new(input)),
2979                    window_expr,
2980                    partition_keys,
2981                    input_order_mode: None,
2982                },
2983            ))),
2984        })
2985    }
2986
2987    fn try_from_bounded_window_agg_exec(
2988        exec: &BoundedWindowAggExec,
2989        extension_codec: &dyn PhysicalExtensionCodec,
2990    ) -> Result<Self> {
2991        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2992            exec.input().to_owned(),
2993            extension_codec,
2994        )?;
2995
2996        let window_expr = exec
2997            .window_expr()
2998            .iter()
2999            .map(|e| serialize_physical_window_expr(e, extension_codec))
3000            .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
3001
3002        let partition_keys = exec
3003            .partition_keys()
3004            .iter()
3005            .map(|e| serialize_physical_expr(e, extension_codec))
3006            .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
3007
3008        let input_order_mode = match &exec.input_order_mode {
3009            InputOrderMode::Linear => {
3010                window_agg_exec_node::InputOrderMode::Linear(protobuf::EmptyMessage {})
3011            }
3012            InputOrderMode::PartiallySorted(columns) => {
3013                window_agg_exec_node::InputOrderMode::PartiallySorted(
3014                    protobuf::PartiallySortedInputOrderMode {
3015                        columns: columns.iter().map(|c| *c as u64).collect(),
3016                    },
3017                )
3018            }
3019            InputOrderMode::Sorted => {
3020                window_agg_exec_node::InputOrderMode::Sorted(protobuf::EmptyMessage {})
3021            }
3022        };
3023
3024        Ok(protobuf::PhysicalPlanNode {
3025            physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
3026                protobuf::WindowAggExecNode {
3027                    input: Some(Box::new(input)),
3028                    window_expr,
3029                    partition_keys,
3030                    input_order_mode: Some(input_order_mode),
3031                },
3032            ))),
3033        })
3034    }
3035
3036    fn try_from_data_sink_exec(
3037        exec: &DataSinkExec,
3038        extension_codec: &dyn PhysicalExtensionCodec,
3039    ) -> Result<Option<Self>> {
3040        let input: protobuf::PhysicalPlanNode =
3041            protobuf::PhysicalPlanNode::try_from_physical_plan(
3042                exec.input().to_owned(),
3043                extension_codec,
3044            )?;
3045        let sort_order = match exec.sort_order() {
3046            Some(requirements) => {
3047                let expr = requirements
3048                    .iter()
3049                    .map(|requirement| {
3050                        let expr: PhysicalSortExpr = requirement.to_owned().into();
3051                        let sort_expr = protobuf::PhysicalSortExprNode {
3052                            expr: Some(Box::new(serialize_physical_expr(
3053                                &expr.expr,
3054                                extension_codec,
3055                            )?)),
3056                            asc: !expr.options.descending,
3057                            nulls_first: expr.options.nulls_first,
3058                        };
3059                        Ok(sort_expr)
3060                    })
3061                    .collect::<Result<Vec<_>>>()?;
3062                Some(protobuf::PhysicalSortExprNodeCollection {
3063                    physical_sort_expr_nodes: expr,
3064                })
3065            }
3066            None => None,
3067        };
3068
3069        if let Some(sink) = exec.sink().as_any().downcast_ref::<JsonSink>() {
3070            return Ok(Some(protobuf::PhysicalPlanNode {
3071                physical_plan_type: Some(PhysicalPlanType::JsonSink(Box::new(
3072                    protobuf::JsonSinkExecNode {
3073                        input: Some(Box::new(input)),
3074                        sink: Some(sink.try_into()?),
3075                        sink_schema: Some(exec.schema().as_ref().try_into()?),
3076                        sort_order,
3077                    },
3078                ))),
3079            }));
3080        }
3081
3082        if let Some(sink) = exec.sink().as_any().downcast_ref::<CsvSink>() {
3083            return Ok(Some(protobuf::PhysicalPlanNode {
3084                physical_plan_type: Some(PhysicalPlanType::CsvSink(Box::new(
3085                    protobuf::CsvSinkExecNode {
3086                        input: Some(Box::new(input)),
3087                        sink: Some(sink.try_into()?),
3088                        sink_schema: Some(exec.schema().as_ref().try_into()?),
3089                        sort_order,
3090                    },
3091                ))),
3092            }));
3093        }
3094
3095        #[cfg(feature = "parquet")]
3096        if let Some(sink) = exec.sink().as_any().downcast_ref::<ParquetSink>() {
3097            return Ok(Some(protobuf::PhysicalPlanNode {
3098                physical_plan_type: Some(PhysicalPlanType::ParquetSink(Box::new(
3099                    protobuf::ParquetSinkExecNode {
3100                        input: Some(Box::new(input)),
3101                        sink: Some(sink.try_into()?),
3102                        sink_schema: Some(exec.schema().as_ref().try_into()?),
3103                        sort_order,
3104                    },
3105                ))),
3106            }));
3107        }
3108
3109        // If unknown DataSink then let extension handle it
3110        Ok(None)
3111    }
3112
3113    fn try_from_unnest_exec(
3114        exec: &UnnestExec,
3115        extension_codec: &dyn PhysicalExtensionCodec,
3116    ) -> Result<Self> {
3117        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
3118            exec.input().to_owned(),
3119            extension_codec,
3120        )?;
3121
3122        Ok(protobuf::PhysicalPlanNode {
3123            physical_plan_type: Some(PhysicalPlanType::Unnest(Box::new(
3124                protobuf::UnnestExecNode {
3125                    input: Some(Box::new(input)),
3126                    schema: Some(exec.schema().try_into()?),
3127                    list_type_columns: exec
3128                        .list_column_indices()
3129                        .iter()
3130                        .map(|c| ProtoListUnnest {
3131                            index_in_input_schema: c.index_in_input_schema as _,
3132                            depth: c.depth as _,
3133                        })
3134                        .collect(),
3135                    struct_type_columns: exec
3136                        .struct_column_indices()
3137                        .iter()
3138                        .map(|c| *c as _)
3139                        .collect(),
3140                    options: Some(exec.options().into()),
3141                },
3142            ))),
3143        })
3144    }
3145
3146    fn try_from_cooperative_exec(
3147        exec: &CooperativeExec,
3148        extension_codec: &dyn PhysicalExtensionCodec,
3149    ) -> Result<Self> {
3150        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
3151            exec.input().to_owned(),
3152            extension_codec,
3153        )?;
3154
3155        Ok(protobuf::PhysicalPlanNode {
3156            physical_plan_type: Some(PhysicalPlanType::Cooperative(Box::new(
3157                protobuf::CooperativeExecNode {
3158                    input: Some(Box::new(input)),
3159                },
3160            ))),
3161        })
3162    }
3163
3164    fn str_to_generate_series_name(name: &str) -> Result<protobuf::GenerateSeriesName> {
3165        match name {
3166            "generate_series" => Ok(protobuf::GenerateSeriesName::GsGenerateSeries),
3167            "range" => Ok(protobuf::GenerateSeriesName::GsRange),
3168            _ => internal_err!("unknown name: {name}"),
3169        }
3170    }
3171
3172    fn try_from_lazy_memory_exec(exec: &LazyMemoryExec) -> Result<Option<Self>> {
3173        let generators = exec.generators();
3174
3175        // ensure we only have one generator
3176        let [generator] = generators.as_slice() else {
3177            return Ok(None);
3178        };
3179
3180        let generator_guard = generator.read();
3181
3182        // Try to downcast to different generate_series types
3183        if let Some(empty_gen) = generator_guard.as_any().downcast_ref::<Empty>() {
3184            let schema = exec.schema();
3185            let node = protobuf::GenerateSeriesNode {
3186                schema: Some(schema.as_ref().try_into()?),
3187                target_batch_size: 8192, // Default batch size
3188                args: Some(protobuf::generate_series_node::Args::ContainsNull(
3189                    protobuf::GenerateSeriesArgsContainsNull {
3190                        name: Self::str_to_generate_series_name(empty_gen.name())? as i32,
3191                    },
3192                )),
3193            };
3194
3195            return Ok(Some(protobuf::PhysicalPlanNode {
3196                physical_plan_type: Some(PhysicalPlanType::GenerateSeries(node)),
3197            }));
3198        }
3199
3200        if let Some(int_64) = generator_guard
3201            .as_any()
3202            .downcast_ref::<GenericSeriesState<i64>>()
3203        {
3204            let schema = exec.schema();
3205            let node = protobuf::GenerateSeriesNode {
3206                schema: Some(schema.as_ref().try_into()?),
3207                target_batch_size: int_64.batch_size() as u32,
3208                args: Some(protobuf::generate_series_node::Args::Int64Args(
3209                    protobuf::GenerateSeriesArgsInt64 {
3210                        start: *int_64.start(),
3211                        end: *int_64.end(),
3212                        step: *int_64.step(),
3213                        include_end: int_64.include_end(),
3214                        name: Self::str_to_generate_series_name(int_64.name())? as i32,
3215                    },
3216                )),
3217            };
3218
3219            return Ok(Some(protobuf::PhysicalPlanNode {
3220                physical_plan_type: Some(PhysicalPlanType::GenerateSeries(node)),
3221            }));
3222        }
3223
3224        if let Some(timestamp_args) = generator_guard
3225            .as_any()
3226            .downcast_ref::<GenericSeriesState<TimestampValue>>()
3227        {
3228            let schema = exec.schema();
3229
3230            let start = timestamp_args.start().value();
3231            let end = timestamp_args.end().value();
3232
3233            let step_value = timestamp_args.step();
3234
3235            let step = Some(datafusion_proto_common::IntervalMonthDayNanoValue {
3236                months: step_value.months,
3237                days: step_value.days,
3238                nanos: step_value.nanoseconds,
3239            });
3240            let include_end = timestamp_args.include_end();
3241            let name = Self::str_to_generate_series_name(timestamp_args.name())? as i32;
3242
3243            let args = match timestamp_args.current().tz_str() {
3244                Some(tz) => protobuf::generate_series_node::Args::TimestampArgs(
3245                    protobuf::GenerateSeriesArgsTimestamp {
3246                        start,
3247                        end,
3248                        step,
3249                        include_end,
3250                        name,
3251                        tz: Some(tz.to_string()),
3252                    },
3253                ),
3254                None => protobuf::generate_series_node::Args::DateArgs(
3255                    protobuf::GenerateSeriesArgsDate {
3256                        start,
3257                        end,
3258                        step,
3259                        include_end,
3260                        name,
3261                    },
3262                ),
3263            };
3264
3265            let node = protobuf::GenerateSeriesNode {
3266                schema: Some(schema.as_ref().try_into()?),
3267                target_batch_size: timestamp_args.batch_size() as u32,
3268                args: Some(args),
3269            };
3270
3271            return Ok(Some(protobuf::PhysicalPlanNode {
3272                physical_plan_type: Some(PhysicalPlanType::GenerateSeries(node)),
3273            }));
3274        }
3275
3276        Ok(None)
3277    }
3278
3279    fn try_from_async_func_exec(
3280        exec: &AsyncFuncExec,
3281        extension_codec: &dyn PhysicalExtensionCodec,
3282    ) -> Result<Self> {
3283        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
3284            Arc::clone(exec.input()),
3285            extension_codec,
3286        )?;
3287
3288        let mut async_exprs = vec![];
3289        let mut async_expr_names = vec![];
3290
3291        for async_expr in exec.async_exprs() {
3292            async_exprs.push(serialize_physical_expr(&async_expr.func, extension_codec)?);
3293            async_expr_names.push(async_expr.name.clone())
3294        }
3295
3296        Ok(protobuf::PhysicalPlanNode {
3297            physical_plan_type: Some(PhysicalPlanType::AsyncFunc(Box::new(
3298                protobuf::AsyncFuncExecNode {
3299                    input: Some(Box::new(input)),
3300                    async_exprs,
3301                    async_expr_names,
3302                },
3303            ))),
3304        })
3305    }
3306}
3307
3308pub trait AsExecutionPlan: Debug + Send + Sync + Clone {
3309    fn try_decode(buf: &[u8]) -> Result<Self>
3310    where
3311        Self: Sized;
3312
3313    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
3314    where
3315        B: BufMut,
3316        Self: Sized;
3317
3318    fn try_into_physical_plan(
3319        &self,
3320        ctx: &TaskContext,
3321
3322        extension_codec: &dyn PhysicalExtensionCodec,
3323    ) -> Result<Arc<dyn ExecutionPlan>>;
3324
3325    fn try_from_physical_plan(
3326        plan: Arc<dyn ExecutionPlan>,
3327        extension_codec: &dyn PhysicalExtensionCodec,
3328    ) -> Result<Self>
3329    where
3330        Self: Sized;
3331}
3332
3333pub trait PhysicalExtensionCodec: Debug + Send + Sync {
3334    fn try_decode(
3335        &self,
3336        buf: &[u8],
3337        inputs: &[Arc<dyn ExecutionPlan>],
3338        ctx: &TaskContext,
3339    ) -> Result<Arc<dyn ExecutionPlan>>;
3340
3341    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()>;
3342
3343    fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
3344        not_impl_err!("PhysicalExtensionCodec is not provided for scalar function {name}")
3345    }
3346
3347    fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
3348        Ok(())
3349    }
3350
3351    fn try_decode_expr(
3352        &self,
3353        _buf: &[u8],
3354        _inputs: &[Arc<dyn PhysicalExpr>],
3355    ) -> Result<Arc<dyn PhysicalExpr>> {
3356        not_impl_err!("PhysicalExtensionCodec is not provided")
3357    }
3358
3359    fn try_encode_expr(
3360        &self,
3361        _node: &Arc<dyn PhysicalExpr>,
3362        _buf: &mut Vec<u8>,
3363    ) -> Result<()> {
3364        not_impl_err!("PhysicalExtensionCodec is not provided")
3365    }
3366
3367    fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
3368        not_impl_err!(
3369            "PhysicalExtensionCodec is not provided for aggregate function {name}"
3370        )
3371    }
3372
3373    fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
3374        Ok(())
3375    }
3376
3377    fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
3378        not_impl_err!("PhysicalExtensionCodec is not provided for window function {name}")
3379    }
3380
3381    fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
3382        Ok(())
3383    }
3384}
3385
3386#[derive(Debug)]
3387pub struct DefaultPhysicalExtensionCodec {}
3388
3389impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec {
3390    fn try_decode(
3391        &self,
3392        _buf: &[u8],
3393        _inputs: &[Arc<dyn ExecutionPlan>],
3394        _ctx: &TaskContext,
3395    ) -> Result<Arc<dyn ExecutionPlan>> {
3396        not_impl_err!("PhysicalExtensionCodec is not provided")
3397    }
3398
3399    fn try_encode(
3400        &self,
3401        _node: Arc<dyn ExecutionPlan>,
3402        _buf: &mut Vec<u8>,
3403    ) -> Result<()> {
3404        not_impl_err!("PhysicalExtensionCodec is not provided")
3405    }
3406}
3407
3408/// DataEncoderTuple captures the position of the encoder
3409/// in the codec list that was used to encode the data and actual encoded data
3410#[derive(Clone, PartialEq, prost::Message)]
3411struct DataEncoderTuple {
3412    /// The position of encoder used to encode data
3413    /// (to be used for decoding)
3414    #[prost(uint32, tag = 1)]
3415    pub encoder_position: u32,
3416
3417    #[prost(bytes, tag = 2)]
3418    pub blob: Vec<u8>,
3419}
3420
3421/// A PhysicalExtensionCodec that tries one of multiple inner codecs
3422/// until one works
3423#[derive(Debug)]
3424pub struct ComposedPhysicalExtensionCodec {
3425    codecs: Vec<Arc<dyn PhysicalExtensionCodec>>,
3426}
3427
3428impl ComposedPhysicalExtensionCodec {
3429    // Position in this codecs list is important as it will be used for decoding.
3430    // If new codec is added it should go to last position.
3431    pub fn new(codecs: Vec<Arc<dyn PhysicalExtensionCodec>>) -> Self {
3432        Self { codecs }
3433    }
3434
3435    fn decode_protobuf<R>(
3436        &self,
3437        buf: &[u8],
3438        decode: impl FnOnce(&dyn PhysicalExtensionCodec, &[u8]) -> Result<R>,
3439    ) -> Result<R> {
3440        let proto =
3441            DataEncoderTuple::decode(buf).map_err(|e| internal_datafusion_err!("{e}"))?;
3442
3443        let codec = self.codecs.get(proto.encoder_position as usize).ok_or(
3444            internal_datafusion_err!("Can't find required codec in codec list"),
3445        )?;
3446
3447        decode(codec.as_ref(), &proto.blob)
3448    }
3449
3450    fn encode_protobuf(
3451        &self,
3452        buf: &mut Vec<u8>,
3453        mut encode: impl FnMut(&dyn PhysicalExtensionCodec, &mut Vec<u8>) -> Result<()>,
3454    ) -> Result<()> {
3455        let mut data = vec![];
3456        let mut last_err = None;
3457        let mut encoder_position = None;
3458
3459        // find the encoder
3460        for (position, codec) in self.codecs.iter().enumerate() {
3461            match encode(codec.as_ref(), &mut data) {
3462                Ok(_) => {
3463                    encoder_position = Some(position as u32);
3464                    break;
3465                }
3466                Err(err) => last_err = Some(err),
3467            }
3468        }
3469
3470        let encoder_position = encoder_position.ok_or_else(|| {
3471            last_err.unwrap_or_else(|| {
3472                DataFusionError::NotImplemented(
3473                    "Empty list of composed codecs".to_owned(),
3474                )
3475            })
3476        })?;
3477
3478        // encode with encoder position
3479        let proto = DataEncoderTuple {
3480            encoder_position,
3481            blob: data,
3482        };
3483        proto
3484            .encode(buf)
3485            .map_err(|e| internal_datafusion_err!("{e}"))
3486    }
3487}
3488
3489impl PhysicalExtensionCodec for ComposedPhysicalExtensionCodec {
3490    fn try_decode(
3491        &self,
3492        buf: &[u8],
3493        inputs: &[Arc<dyn ExecutionPlan>],
3494        ctx: &TaskContext,
3495    ) -> Result<Arc<dyn ExecutionPlan>> {
3496        self.decode_protobuf(buf, |codec, data| codec.try_decode(data, inputs, ctx))
3497    }
3498
3499    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
3500        self.encode_protobuf(buf, |codec, data| codec.try_encode(Arc::clone(&node), data))
3501    }
3502
3503    fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
3504        self.decode_protobuf(buf, |codec, data| codec.try_decode_udf(name, data))
3505    }
3506
3507    fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
3508        self.encode_protobuf(buf, |codec, data| codec.try_encode_udf(node, data))
3509    }
3510
3511    fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
3512        self.decode_protobuf(buf, |codec, data| codec.try_decode_udaf(name, data))
3513    }
3514
3515    fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
3516        self.encode_protobuf(buf, |codec, data| codec.try_encode_udaf(node, data))
3517    }
3518}
3519
3520fn into_physical_plan(
3521    node: &Option<Box<protobuf::PhysicalPlanNode>>,
3522    ctx: &TaskContext,
3523    extension_codec: &dyn PhysicalExtensionCodec,
3524) -> Result<Arc<dyn ExecutionPlan>> {
3525    if let Some(field) = node {
3526        field.try_into_physical_plan(ctx, extension_codec)
3527    } else {
3528        Err(proto_error("Missing required field in protobuf"))
3529    }
3530}