datafusion_proto/physical_plan/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use self::from_proto::parse_protobuf_partitioning;
22use self::to_proto::{serialize_partitioning, serialize_physical_expr};
23use crate::common::{byte_to_string, str_to_byte};
24use crate::physical_plan::from_proto::{
25    parse_physical_expr, parse_physical_sort_expr, parse_physical_sort_exprs,
26    parse_physical_window_expr, parse_protobuf_file_scan_config, parse_record_batches,
27};
28use crate::physical_plan::to_proto::{
29    serialize_file_scan_config, serialize_maybe_filter, serialize_physical_aggr_expr,
30    serialize_physical_sort_exprs, serialize_physical_window_expr,
31    serialize_record_batches,
32};
33use crate::protobuf::physical_aggregate_expr_node::AggregateFunction;
34use crate::protobuf::physical_expr_node::ExprType;
35use crate::protobuf::physical_plan_node::PhysicalPlanType;
36use crate::protobuf::{
37    self, proto_error, window_agg_exec_node, ListUnnest as ProtoListUnnest, SortExprNode,
38    SortMergeJoinExecNode,
39};
40use crate::{convert_required, into_required};
41
42use arrow::compute::SortOptions;
43use arrow::datatypes::{IntervalMonthDayNanoType, SchemaRef};
44use datafusion_catalog::memory::MemorySourceConfig;
45use datafusion_common::{
46    internal_datafusion_err, internal_err, not_impl_err, DataFusionError, Result,
47};
48#[cfg(feature = "parquet")]
49use datafusion_datasource::file::FileSource;
50use datafusion_datasource::file_compression_type::FileCompressionType;
51use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
52use datafusion_datasource::sink::DataSinkExec;
53use datafusion_datasource::source::{DataSource, DataSourceExec};
54#[cfg(feature = "avro")]
55use datafusion_datasource_avro::source::AvroSource;
56use datafusion_datasource_csv::file_format::CsvSink;
57use datafusion_datasource_csv::source::CsvSource;
58use datafusion_datasource_json::file_format::JsonSink;
59use datafusion_datasource_json::source::JsonSource;
60#[cfg(feature = "parquet")]
61use datafusion_datasource_parquet::file_format::ParquetSink;
62#[cfg(feature = "parquet")]
63use datafusion_datasource_parquet::source::ParquetSource;
64use datafusion_execution::{FunctionRegistry, TaskContext};
65use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
66use datafusion_functions_table::generate_series::{
67    Empty, GenSeriesArgs, GenerateSeriesTable, GenericSeriesState, TimestampValue,
68};
69use datafusion_physical_expr::aggregate::AggregateExprBuilder;
70use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
71use datafusion_physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef};
72use datafusion_physical_plan::aggregates::AggregateMode;
73use datafusion_physical_plan::aggregates::{AggregateExec, PhysicalGroupBy};
74use datafusion_physical_plan::analyze::AnalyzeExec;
75use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
76use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
77use datafusion_physical_plan::coop::CooperativeExec;
78use datafusion_physical_plan::empty::EmptyExec;
79use datafusion_physical_plan::explain::ExplainExec;
80use datafusion_physical_plan::expressions::PhysicalSortExpr;
81use datafusion_physical_plan::filter::FilterExec;
82use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter};
83use datafusion_physical_plan::joins::{
84    CrossJoinExec, NestedLoopJoinExec, SortMergeJoinExec, StreamJoinPartitionMode,
85    SymmetricHashJoinExec,
86};
87use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
88use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
89use datafusion_physical_plan::memory::LazyMemoryExec;
90use datafusion_physical_plan::metrics::MetricType;
91use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
92use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
93use datafusion_physical_plan::repartition::RepartitionExec;
94use datafusion_physical_plan::sorts::sort::SortExec;
95use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
96use datafusion_physical_plan::union::{InterleaveExec, UnionExec};
97use datafusion_physical_plan::unnest::{ListUnnest, UnnestExec};
98use datafusion_physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
99use datafusion_physical_plan::{ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr};
100
101use prost::bytes::BufMut;
102use prost::Message;
103
104pub mod from_proto;
105pub mod to_proto;
106
107impl AsExecutionPlan for protobuf::PhysicalPlanNode {
108    fn try_decode(buf: &[u8]) -> Result<Self>
109    where
110        Self: Sized,
111    {
112        protobuf::PhysicalPlanNode::decode(buf).map_err(|e| {
113            internal_datafusion_err!("failed to decode physical plan: {e:?}")
114        })
115    }
116
117    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
118    where
119        B: BufMut,
120        Self: Sized,
121    {
122        self.encode(buf).map_err(|e| {
123            internal_datafusion_err!("failed to encode physical plan: {e:?}")
124        })
125    }
126
127    fn try_into_physical_plan(
128        &self,
129        ctx: &TaskContext,
130
131        extension_codec: &dyn PhysicalExtensionCodec,
132    ) -> Result<Arc<dyn ExecutionPlan>> {
133        let plan = self.physical_plan_type.as_ref().ok_or_else(|| {
134            proto_error(format!(
135                "physical_plan::from_proto() Unsupported physical plan '{self:?}'"
136            ))
137        })?;
138        match plan {
139            PhysicalPlanType::Explain(explain) => {
140                self.try_into_explain_physical_plan(explain, ctx, extension_codec)
141            }
142            PhysicalPlanType::Projection(projection) => {
143                self.try_into_projection_physical_plan(projection, ctx, extension_codec)
144            }
145            PhysicalPlanType::Filter(filter) => {
146                self.try_into_filter_physical_plan(filter, ctx, extension_codec)
147            }
148            PhysicalPlanType::CsvScan(scan) => {
149                self.try_into_csv_scan_physical_plan(scan, ctx, extension_codec)
150            }
151            PhysicalPlanType::JsonScan(scan) => {
152                self.try_into_json_scan_physical_plan(scan, ctx, extension_codec)
153            }
154            #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
155            PhysicalPlanType::ParquetScan(scan) => {
156                self.try_into_parquet_scan_physical_plan(scan, ctx, extension_codec)
157            }
158            #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
159            PhysicalPlanType::AvroScan(scan) => {
160                self.try_into_avro_scan_physical_plan(scan, ctx, extension_codec)
161            }
162            PhysicalPlanType::MemoryScan(scan) => {
163                self.try_into_memory_scan_physical_plan(scan, ctx, extension_codec)
164            }
165            PhysicalPlanType::CoalesceBatches(coalesce_batches) => self
166                .try_into_coalesce_batches_physical_plan(
167                    coalesce_batches,
168                    ctx,
169                    extension_codec,
170                ),
171            PhysicalPlanType::Merge(merge) => {
172                self.try_into_merge_physical_plan(merge, ctx, extension_codec)
173            }
174            PhysicalPlanType::Repartition(repart) => {
175                self.try_into_repartition_physical_plan(repart, ctx, extension_codec)
176            }
177            PhysicalPlanType::GlobalLimit(limit) => {
178                self.try_into_global_limit_physical_plan(limit, ctx, extension_codec)
179            }
180            PhysicalPlanType::LocalLimit(limit) => {
181                self.try_into_local_limit_physical_plan(limit, ctx, extension_codec)
182            }
183            PhysicalPlanType::Window(window_agg) => {
184                self.try_into_window_physical_plan(window_agg, ctx, extension_codec)
185            }
186            PhysicalPlanType::Aggregate(hash_agg) => {
187                self.try_into_aggregate_physical_plan(hash_agg, ctx, extension_codec)
188            }
189            PhysicalPlanType::HashJoin(hashjoin) => {
190                self.try_into_hash_join_physical_plan(hashjoin, ctx, extension_codec)
191            }
192            PhysicalPlanType::SymmetricHashJoin(sym_join) => self
193                .try_into_symmetric_hash_join_physical_plan(
194                    sym_join,
195                    ctx,
196                    extension_codec,
197                ),
198            PhysicalPlanType::Union(union) => {
199                self.try_into_union_physical_plan(union, ctx, extension_codec)
200            }
201            PhysicalPlanType::Interleave(interleave) => {
202                self.try_into_interleave_physical_plan(interleave, ctx, extension_codec)
203            }
204            PhysicalPlanType::CrossJoin(crossjoin) => {
205                self.try_into_cross_join_physical_plan(crossjoin, ctx, extension_codec)
206            }
207            PhysicalPlanType::Empty(empty) => {
208                self.try_into_empty_physical_plan(empty, ctx, extension_codec)
209            }
210            PhysicalPlanType::PlaceholderRow(placeholder) => self
211                .try_into_placeholder_row_physical_plan(
212                    placeholder,
213                    ctx,
214                    extension_codec,
215                ),
216            PhysicalPlanType::Sort(sort) => {
217                self.try_into_sort_physical_plan(sort, ctx, extension_codec)
218            }
219            PhysicalPlanType::SortPreservingMerge(sort) => self
220                .try_into_sort_preserving_merge_physical_plan(sort, ctx, extension_codec),
221            PhysicalPlanType::Extension(extension) => {
222                self.try_into_extension_physical_plan(extension, ctx, extension_codec)
223            }
224            PhysicalPlanType::NestedLoopJoin(join) => {
225                self.try_into_nested_loop_join_physical_plan(join, ctx, extension_codec)
226            }
227            PhysicalPlanType::Analyze(analyze) => {
228                self.try_into_analyze_physical_plan(analyze, ctx, extension_codec)
229            }
230            PhysicalPlanType::JsonSink(sink) => {
231                self.try_into_json_sink_physical_plan(sink, ctx, extension_codec)
232            }
233            PhysicalPlanType::CsvSink(sink) => {
234                self.try_into_csv_sink_physical_plan(sink, ctx, extension_codec)
235            }
236            #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
237            PhysicalPlanType::ParquetSink(sink) => {
238                self.try_into_parquet_sink_physical_plan(sink, ctx, extension_codec)
239            }
240            PhysicalPlanType::Unnest(unnest) => {
241                self.try_into_unnest_physical_plan(unnest, ctx, extension_codec)
242            }
243            PhysicalPlanType::Cooperative(cooperative) => {
244                self.try_into_cooperative_physical_plan(cooperative, ctx, extension_codec)
245            }
246            PhysicalPlanType::GenerateSeries(generate_series) => {
247                self.try_into_generate_series_physical_plan(generate_series)
248            }
249            PhysicalPlanType::SortMergeJoin(sort_join) => {
250                self.try_into_sort_join(sort_join, ctx, extension_codec)
251            }
252        }
253    }
254
255    fn try_from_physical_plan(
256        plan: Arc<dyn ExecutionPlan>,
257        extension_codec: &dyn PhysicalExtensionCodec,
258    ) -> Result<Self>
259    where
260        Self: Sized,
261    {
262        let plan_clone = Arc::clone(&plan);
263        let plan = plan.as_any();
264
265        if let Some(exec) = plan.downcast_ref::<ExplainExec>() {
266            return protobuf::PhysicalPlanNode::try_from_explain_exec(
267                exec,
268                extension_codec,
269            );
270        }
271
272        if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
273            return protobuf::PhysicalPlanNode::try_from_projection_exec(
274                exec,
275                extension_codec,
276            );
277        }
278
279        if let Some(exec) = plan.downcast_ref::<AnalyzeExec>() {
280            return protobuf::PhysicalPlanNode::try_from_analyze_exec(
281                exec,
282                extension_codec,
283            );
284        }
285
286        if let Some(exec) = plan.downcast_ref::<FilterExec>() {
287            return protobuf::PhysicalPlanNode::try_from_filter_exec(
288                exec,
289                extension_codec,
290            );
291        }
292
293        if let Some(limit) = plan.downcast_ref::<GlobalLimitExec>() {
294            return protobuf::PhysicalPlanNode::try_from_global_limit_exec(
295                limit,
296                extension_codec,
297            );
298        }
299
300        if let Some(limit) = plan.downcast_ref::<LocalLimitExec>() {
301            return protobuf::PhysicalPlanNode::try_from_local_limit_exec(
302                limit,
303                extension_codec,
304            );
305        }
306
307        if let Some(exec) = plan.downcast_ref::<HashJoinExec>() {
308            return protobuf::PhysicalPlanNode::try_from_hash_join_exec(
309                exec,
310                extension_codec,
311            );
312        }
313
314        if let Some(exec) = plan.downcast_ref::<SymmetricHashJoinExec>() {
315            return protobuf::PhysicalPlanNode::try_from_symmetric_hash_join_exec(
316                exec,
317                extension_codec,
318            );
319        }
320
321        if let Some(exec) = plan.downcast_ref::<SortMergeJoinExec>() {
322            return protobuf::PhysicalPlanNode::try_from_sort_merge_join_exec(
323                exec,
324                extension_codec,
325            );
326        }
327
328        if let Some(exec) = plan.downcast_ref::<CrossJoinExec>() {
329            return protobuf::PhysicalPlanNode::try_from_cross_join_exec(
330                exec,
331                extension_codec,
332            );
333        }
334
335        if let Some(exec) = plan.downcast_ref::<AggregateExec>() {
336            return protobuf::PhysicalPlanNode::try_from_aggregate_exec(
337                exec,
338                extension_codec,
339            );
340        }
341
342        if let Some(empty) = plan.downcast_ref::<EmptyExec>() {
343            return protobuf::PhysicalPlanNode::try_from_empty_exec(
344                empty,
345                extension_codec,
346            );
347        }
348
349        if let Some(empty) = plan.downcast_ref::<PlaceholderRowExec>() {
350            return protobuf::PhysicalPlanNode::try_from_placeholder_row_exec(
351                empty,
352                extension_codec,
353            );
354        }
355
356        if let Some(coalesce_batches) = plan.downcast_ref::<CoalesceBatchesExec>() {
357            return protobuf::PhysicalPlanNode::try_from_coalesce_batches_exec(
358                coalesce_batches,
359                extension_codec,
360            );
361        }
362
363        if let Some(data_source_exec) = plan.downcast_ref::<DataSourceExec>() {
364            if let Some(node) = protobuf::PhysicalPlanNode::try_from_data_source_exec(
365                data_source_exec,
366                extension_codec,
367            )? {
368                return Ok(node);
369            }
370        }
371
372        if let Some(exec) = plan.downcast_ref::<CoalescePartitionsExec>() {
373            return protobuf::PhysicalPlanNode::try_from_coalesce_partitions_exec(
374                exec,
375                extension_codec,
376            );
377        }
378
379        if let Some(exec) = plan.downcast_ref::<RepartitionExec>() {
380            return protobuf::PhysicalPlanNode::try_from_repartition_exec(
381                exec,
382                extension_codec,
383            );
384        }
385
386        if let Some(exec) = plan.downcast_ref::<SortExec>() {
387            return protobuf::PhysicalPlanNode::try_from_sort_exec(exec, extension_codec);
388        }
389
390        if let Some(union) = plan.downcast_ref::<UnionExec>() {
391            return protobuf::PhysicalPlanNode::try_from_union_exec(
392                union,
393                extension_codec,
394            );
395        }
396
397        if let Some(interleave) = plan.downcast_ref::<InterleaveExec>() {
398            return protobuf::PhysicalPlanNode::try_from_interleave_exec(
399                interleave,
400                extension_codec,
401            );
402        }
403
404        if let Some(exec) = plan.downcast_ref::<SortPreservingMergeExec>() {
405            return protobuf::PhysicalPlanNode::try_from_sort_preserving_merge_exec(
406                exec,
407                extension_codec,
408            );
409        }
410
411        if let Some(exec) = plan.downcast_ref::<NestedLoopJoinExec>() {
412            return protobuf::PhysicalPlanNode::try_from_nested_loop_join_exec(
413                exec,
414                extension_codec,
415            );
416        }
417
418        if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
419            return protobuf::PhysicalPlanNode::try_from_window_agg_exec(
420                exec,
421                extension_codec,
422            );
423        }
424
425        if let Some(exec) = plan.downcast_ref::<BoundedWindowAggExec>() {
426            return protobuf::PhysicalPlanNode::try_from_bounded_window_agg_exec(
427                exec,
428                extension_codec,
429            );
430        }
431
432        if let Some(exec) = plan.downcast_ref::<DataSinkExec>() {
433            if let Some(node) = protobuf::PhysicalPlanNode::try_from_data_sink_exec(
434                exec,
435                extension_codec,
436            )? {
437                return Ok(node);
438            }
439        }
440
441        if let Some(exec) = plan.downcast_ref::<UnnestExec>() {
442            return protobuf::PhysicalPlanNode::try_from_unnest_exec(
443                exec,
444                extension_codec,
445            );
446        }
447
448        if let Some(exec) = plan.downcast_ref::<CooperativeExec>() {
449            return protobuf::PhysicalPlanNode::try_from_cooperative_exec(
450                exec,
451                extension_codec,
452            );
453        }
454
455        if let Some(exec) = plan.downcast_ref::<LazyMemoryExec>() {
456            if let Some(node) =
457                protobuf::PhysicalPlanNode::try_from_lazy_memory_exec(exec)?
458            {
459                return Ok(node);
460            }
461        }
462
463        let mut buf: Vec<u8> = vec![];
464        match extension_codec.try_encode(Arc::clone(&plan_clone), &mut buf) {
465            Ok(_) => {
466                let inputs: Vec<protobuf::PhysicalPlanNode> = plan_clone
467                    .children()
468                    .into_iter()
469                    .cloned()
470                    .map(|i| {
471                        protobuf::PhysicalPlanNode::try_from_physical_plan(
472                            i,
473                            extension_codec,
474                        )
475                    })
476                    .collect::<Result<_>>()?;
477
478                Ok(protobuf::PhysicalPlanNode {
479                    physical_plan_type: Some(PhysicalPlanType::Extension(
480                        protobuf::PhysicalExtensionNode { node: buf, inputs },
481                    )),
482                })
483            }
484            Err(e) => internal_err!(
485                "Unsupported plan and extension codec failed with [{e}]. Plan: {plan_clone:?}"
486            ),
487        }
488    }
489}
490
491impl protobuf::PhysicalPlanNode {
492    fn try_into_explain_physical_plan(
493        &self,
494        explain: &protobuf::ExplainExecNode,
495        _ctx: &TaskContext,
496
497        _extension_codec: &dyn PhysicalExtensionCodec,
498    ) -> Result<Arc<dyn ExecutionPlan>> {
499        Ok(Arc::new(ExplainExec::new(
500            Arc::new(explain.schema.as_ref().unwrap().try_into()?),
501            explain
502                .stringified_plans
503                .iter()
504                .map(|plan| plan.into())
505                .collect(),
506            explain.verbose,
507        )))
508    }
509
510    fn try_into_projection_physical_plan(
511        &self,
512        projection: &protobuf::ProjectionExecNode,
513        ctx: &TaskContext,
514
515        extension_codec: &dyn PhysicalExtensionCodec,
516    ) -> Result<Arc<dyn ExecutionPlan>> {
517        let input: Arc<dyn ExecutionPlan> =
518            into_physical_plan(&projection.input, ctx, extension_codec)?;
519        let exprs = projection
520            .expr
521            .iter()
522            .zip(projection.expr_name.iter())
523            .map(|(expr, name)| {
524                Ok((
525                    parse_physical_expr(
526                        expr,
527                        ctx,
528                        input.schema().as_ref(),
529                        extension_codec,
530                    )?,
531                    name.to_string(),
532                ))
533            })
534            .collect::<Result<Vec<(Arc<dyn PhysicalExpr>, String)>>>()?;
535        let proj_exprs: Vec<ProjectionExpr> = exprs
536            .into_iter()
537            .map(|(expr, alias)| ProjectionExpr { expr, alias })
538            .collect();
539        Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?))
540    }
541
542    fn try_into_filter_physical_plan(
543        &self,
544        filter: &protobuf::FilterExecNode,
545        ctx: &TaskContext,
546
547        extension_codec: &dyn PhysicalExtensionCodec,
548    ) -> Result<Arc<dyn ExecutionPlan>> {
549        let input: Arc<dyn ExecutionPlan> =
550            into_physical_plan(&filter.input, ctx, extension_codec)?;
551
552        let predicate = filter
553            .expr
554            .as_ref()
555            .map(|expr| {
556                parse_physical_expr(expr, ctx, input.schema().as_ref(), extension_codec)
557            })
558            .transpose()?
559            .ok_or_else(|| {
560                internal_datafusion_err!(
561                    "filter (FilterExecNode) in PhysicalPlanNode is missing."
562                )
563            })?;
564
565        let filter_selectivity = filter.default_filter_selectivity.try_into();
566        let projection = if !filter.projection.is_empty() {
567            Some(
568                filter
569                    .projection
570                    .iter()
571                    .map(|i| *i as usize)
572                    .collect::<Vec<_>>(),
573            )
574        } else {
575            None
576        };
577
578        let filter =
579            FilterExec::try_new(predicate, input)?.with_projection(projection)?;
580        match filter_selectivity {
581            Ok(filter_selectivity) => Ok(Arc::new(
582                filter.with_default_selectivity(filter_selectivity)?,
583            )),
584            Err(_) => Err(internal_datafusion_err!(
585                "filter_selectivity in PhysicalPlanNode is invalid "
586            )),
587        }
588    }
589
590    fn try_into_csv_scan_physical_plan(
591        &self,
592        scan: &protobuf::CsvScanExecNode,
593        ctx: &TaskContext,
594
595        extension_codec: &dyn PhysicalExtensionCodec,
596    ) -> Result<Arc<dyn ExecutionPlan>> {
597        let escape =
598            if let Some(protobuf::csv_scan_exec_node::OptionalEscape::Escape(escape)) =
599                &scan.optional_escape
600            {
601                Some(str_to_byte(escape, "escape")?)
602            } else {
603                None
604            };
605
606        let comment = if let Some(
607            protobuf::csv_scan_exec_node::OptionalComment::Comment(comment),
608        ) = &scan.optional_comment
609        {
610            Some(str_to_byte(comment, "comment")?)
611        } else {
612            None
613        };
614
615        let source = Arc::new(
616            CsvSource::new(
617                scan.has_header,
618                str_to_byte(&scan.delimiter, "delimiter")?,
619                0,
620            )
621            .with_escape(escape)
622            .with_comment(comment),
623        );
624
625        let conf = FileScanConfigBuilder::from(parse_protobuf_file_scan_config(
626            scan.base_conf.as_ref().unwrap(),
627            ctx,
628            extension_codec,
629            source,
630        )?)
631        .with_newlines_in_values(scan.newlines_in_values)
632        .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
633        .build();
634        Ok(DataSourceExec::from_data_source(conf))
635    }
636
637    fn try_into_json_scan_physical_plan(
638        &self,
639        scan: &protobuf::JsonScanExecNode,
640        ctx: &TaskContext,
641
642        extension_codec: &dyn PhysicalExtensionCodec,
643    ) -> Result<Arc<dyn ExecutionPlan>> {
644        let scan_conf = parse_protobuf_file_scan_config(
645            scan.base_conf.as_ref().unwrap(),
646            ctx,
647            extension_codec,
648            Arc::new(JsonSource::new()),
649        )?;
650        Ok(DataSourceExec::from_data_source(scan_conf))
651    }
652
653    #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
654    fn try_into_parquet_scan_physical_plan(
655        &self,
656        scan: &protobuf::ParquetScanExecNode,
657        ctx: &TaskContext,
658
659        extension_codec: &dyn PhysicalExtensionCodec,
660    ) -> Result<Arc<dyn ExecutionPlan>> {
661        #[cfg(feature = "parquet")]
662        {
663            let schema = from_proto::parse_protobuf_file_scan_schema(
664                scan.base_conf.as_ref().unwrap(),
665            )?;
666
667            // Check if there's a projection and use projected schema for predicate parsing
668            let base_conf = scan.base_conf.as_ref().unwrap();
669            let predicate_schema = if !base_conf.projection.is_empty() {
670                // Create projected schema for parsing the predicate
671                let projected_fields: Vec<_> = base_conf
672                    .projection
673                    .iter()
674                    .map(|&i| schema.field(i as usize).clone())
675                    .collect();
676                Arc::new(arrow::datatypes::Schema::new(projected_fields))
677            } else {
678                schema
679            };
680
681            let predicate = scan
682                .predicate
683                .as_ref()
684                .map(|expr| {
685                    parse_physical_expr(
686                        expr,
687                        ctx,
688                        predicate_schema.as_ref(),
689                        extension_codec,
690                    )
691                })
692                .transpose()?;
693            let mut options = datafusion_common::config::TableParquetOptions::default();
694
695            if let Some(table_options) = scan.parquet_options.as_ref() {
696                options = table_options.try_into()?;
697            }
698            let mut source = ParquetSource::new(options);
699
700            if let Some(predicate) = predicate {
701                source = source.with_predicate(predicate);
702            }
703            let base_config = parse_protobuf_file_scan_config(
704                base_conf,
705                ctx,
706                extension_codec,
707                Arc::new(source),
708            )?;
709            Ok(DataSourceExec::from_data_source(base_config))
710        }
711        #[cfg(not(feature = "parquet"))]
712        panic!("Unable to process a Parquet PhysicalPlan when `parquet` feature is not enabled")
713    }
714
715    #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
716    fn try_into_avro_scan_physical_plan(
717        &self,
718        scan: &protobuf::AvroScanExecNode,
719        ctx: &TaskContext,
720
721        extension_codec: &dyn PhysicalExtensionCodec,
722    ) -> Result<Arc<dyn ExecutionPlan>> {
723        #[cfg(feature = "avro")]
724        {
725            let conf = parse_protobuf_file_scan_config(
726                scan.base_conf.as_ref().unwrap(),
727                ctx,
728                extension_codec,
729                Arc::new(AvroSource::new()),
730            )?;
731            Ok(DataSourceExec::from_data_source(conf))
732        }
733        #[cfg(not(feature = "avro"))]
734        panic!("Unable to process a Avro PhysicalPlan when `avro` feature is not enabled")
735    }
736
737    fn try_into_memory_scan_physical_plan(
738        &self,
739        scan: &protobuf::MemoryScanExecNode,
740        ctx: &TaskContext,
741
742        extension_codec: &dyn PhysicalExtensionCodec,
743    ) -> Result<Arc<dyn ExecutionPlan>> {
744        let partitions = scan
745            .partitions
746            .iter()
747            .map(|p| parse_record_batches(p))
748            .collect::<Result<Vec<_>>>()?;
749
750        let proto_schema = scan.schema.as_ref().ok_or_else(|| {
751            internal_datafusion_err!("schema in MemoryScanExecNode is missing.")
752        })?;
753        let schema: SchemaRef = SchemaRef::new(proto_schema.try_into()?);
754
755        let projection = if !scan.projection.is_empty() {
756            Some(
757                scan.projection
758                    .iter()
759                    .map(|i| *i as usize)
760                    .collect::<Vec<_>>(),
761            )
762        } else {
763            None
764        };
765
766        let mut sort_information = vec![];
767        for ordering in &scan.sort_information {
768            let sort_exprs = parse_physical_sort_exprs(
769                &ordering.physical_sort_expr_nodes,
770                ctx,
771                &schema,
772                extension_codec,
773            )?;
774            sort_information.extend(LexOrdering::new(sort_exprs));
775        }
776
777        let source = MemorySourceConfig::try_new(&partitions, schema, projection)?
778            .with_limit(scan.fetch.map(|f| f as usize))
779            .with_show_sizes(scan.show_sizes);
780
781        let source = source.try_with_sort_information(sort_information)?;
782
783        Ok(DataSourceExec::from_data_source(source))
784    }
785
786    fn try_into_coalesce_batches_physical_plan(
787        &self,
788        coalesce_batches: &protobuf::CoalesceBatchesExecNode,
789        ctx: &TaskContext,
790
791        extension_codec: &dyn PhysicalExtensionCodec,
792    ) -> Result<Arc<dyn ExecutionPlan>> {
793        let input: Arc<dyn ExecutionPlan> =
794            into_physical_plan(&coalesce_batches.input, ctx, extension_codec)?;
795        Ok(Arc::new(
796            CoalesceBatchesExec::new(input, coalesce_batches.target_batch_size as usize)
797                .with_fetch(coalesce_batches.fetch.map(|f| f as usize)),
798        ))
799    }
800
801    fn try_into_merge_physical_plan(
802        &self,
803        merge: &protobuf::CoalescePartitionsExecNode,
804        ctx: &TaskContext,
805
806        extension_codec: &dyn PhysicalExtensionCodec,
807    ) -> Result<Arc<dyn ExecutionPlan>> {
808        let input: Arc<dyn ExecutionPlan> =
809            into_physical_plan(&merge.input, ctx, extension_codec)?;
810        Ok(Arc::new(
811            CoalescePartitionsExec::new(input)
812                .with_fetch(merge.fetch.map(|f| f as usize)),
813        ))
814    }
815
816    fn try_into_repartition_physical_plan(
817        &self,
818        repart: &protobuf::RepartitionExecNode,
819        ctx: &TaskContext,
820
821        extension_codec: &dyn PhysicalExtensionCodec,
822    ) -> Result<Arc<dyn ExecutionPlan>> {
823        let input: Arc<dyn ExecutionPlan> =
824            into_physical_plan(&repart.input, ctx, extension_codec)?;
825        let partitioning = parse_protobuf_partitioning(
826            repart.partitioning.as_ref(),
827            ctx,
828            input.schema().as_ref(),
829            extension_codec,
830        )?;
831        Ok(Arc::new(RepartitionExec::try_new(
832            input,
833            partitioning.unwrap(),
834        )?))
835    }
836
837    fn try_into_global_limit_physical_plan(
838        &self,
839        limit: &protobuf::GlobalLimitExecNode,
840        ctx: &TaskContext,
841
842        extension_codec: &dyn PhysicalExtensionCodec,
843    ) -> Result<Arc<dyn ExecutionPlan>> {
844        let input: Arc<dyn ExecutionPlan> =
845            into_physical_plan(&limit.input, ctx, extension_codec)?;
846        let fetch = if limit.fetch >= 0 {
847            Some(limit.fetch as usize)
848        } else {
849            None
850        };
851        Ok(Arc::new(GlobalLimitExec::new(
852            input,
853            limit.skip as usize,
854            fetch,
855        )))
856    }
857
858    fn try_into_local_limit_physical_plan(
859        &self,
860        limit: &protobuf::LocalLimitExecNode,
861        ctx: &TaskContext,
862
863        extension_codec: &dyn PhysicalExtensionCodec,
864    ) -> Result<Arc<dyn ExecutionPlan>> {
865        let input: Arc<dyn ExecutionPlan> =
866            into_physical_plan(&limit.input, ctx, extension_codec)?;
867        Ok(Arc::new(LocalLimitExec::new(input, limit.fetch as usize)))
868    }
869
870    fn try_into_window_physical_plan(
871        &self,
872        window_agg: &protobuf::WindowAggExecNode,
873        ctx: &TaskContext,
874
875        extension_codec: &dyn PhysicalExtensionCodec,
876    ) -> Result<Arc<dyn ExecutionPlan>> {
877        let input: Arc<dyn ExecutionPlan> =
878            into_physical_plan(&window_agg.input, ctx, extension_codec)?;
879        let input_schema = input.schema();
880
881        let physical_window_expr: Vec<Arc<dyn WindowExpr>> = window_agg
882            .window_expr
883            .iter()
884            .map(|window_expr| {
885                parse_physical_window_expr(
886                    window_expr,
887                    ctx,
888                    input_schema.as_ref(),
889                    extension_codec,
890                )
891            })
892            .collect::<Result<Vec<_>, _>>()?;
893
894        let partition_keys = window_agg
895            .partition_keys
896            .iter()
897            .map(|expr| {
898                parse_physical_expr(expr, ctx, input.schema().as_ref(), extension_codec)
899            })
900            .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
901
902        if let Some(input_order_mode) = window_agg.input_order_mode.as_ref() {
903            let input_order_mode = match input_order_mode {
904                window_agg_exec_node::InputOrderMode::Linear(_) => InputOrderMode::Linear,
905                window_agg_exec_node::InputOrderMode::PartiallySorted(
906                    protobuf::PartiallySortedInputOrderMode { columns },
907                ) => InputOrderMode::PartiallySorted(
908                    columns.iter().map(|c| *c as usize).collect(),
909                ),
910                window_agg_exec_node::InputOrderMode::Sorted(_) => InputOrderMode::Sorted,
911            };
912
913            Ok(Arc::new(BoundedWindowAggExec::try_new(
914                physical_window_expr,
915                input,
916                input_order_mode,
917                !partition_keys.is_empty(),
918            )?))
919        } else {
920            Ok(Arc::new(WindowAggExec::try_new(
921                physical_window_expr,
922                input,
923                !partition_keys.is_empty(),
924            )?))
925        }
926    }
927
928    fn try_into_aggregate_physical_plan(
929        &self,
930        hash_agg: &protobuf::AggregateExecNode,
931        ctx: &TaskContext,
932
933        extension_codec: &dyn PhysicalExtensionCodec,
934    ) -> Result<Arc<dyn ExecutionPlan>> {
935        let input: Arc<dyn ExecutionPlan> =
936            into_physical_plan(&hash_agg.input, ctx, extension_codec)?;
937        let mode = protobuf::AggregateMode::try_from(hash_agg.mode).map_err(|_| {
938            proto_error(format!(
939                "Received a AggregateNode message with unknown AggregateMode {}",
940                hash_agg.mode
941            ))
942        })?;
943        let agg_mode: AggregateMode = match mode {
944            protobuf::AggregateMode::Partial => AggregateMode::Partial,
945            protobuf::AggregateMode::Final => AggregateMode::Final,
946            protobuf::AggregateMode::FinalPartitioned => AggregateMode::FinalPartitioned,
947            protobuf::AggregateMode::Single => AggregateMode::Single,
948            protobuf::AggregateMode::SinglePartitioned => {
949                AggregateMode::SinglePartitioned
950            }
951        };
952
953        let num_expr = hash_agg.group_expr.len();
954
955        let group_expr = hash_agg
956            .group_expr
957            .iter()
958            .zip(hash_agg.group_expr_name.iter())
959            .map(|(expr, name)| {
960                parse_physical_expr(expr, ctx, input.schema().as_ref(), extension_codec)
961                    .map(|expr| (expr, name.to_string()))
962            })
963            .collect::<Result<Vec<_>, _>>()?;
964
965        let null_expr = hash_agg
966            .null_expr
967            .iter()
968            .zip(hash_agg.group_expr_name.iter())
969            .map(|(expr, name)| {
970                parse_physical_expr(expr, ctx, input.schema().as_ref(), extension_codec)
971                    .map(|expr| (expr, name.to_string()))
972            })
973            .collect::<Result<Vec<_>, _>>()?;
974
975        let groups: Vec<Vec<bool>> = if !hash_agg.groups.is_empty() {
976            hash_agg
977                .groups
978                .chunks(num_expr)
979                .map(|g| g.to_vec())
980                .collect::<Vec<Vec<bool>>>()
981        } else {
982            vec![]
983        };
984
985        let input_schema = hash_agg.input_schema.as_ref().ok_or_else(|| {
986            internal_datafusion_err!("input_schema in AggregateNode is missing.")
987        })?;
988        let physical_schema: SchemaRef = SchemaRef::new(input_schema.try_into()?);
989
990        let physical_filter_expr = hash_agg
991            .filter_expr
992            .iter()
993            .map(|expr| {
994                expr.expr
995                    .as_ref()
996                    .map(|e| {
997                        parse_physical_expr(e, ctx, &physical_schema, extension_codec)
998                    })
999                    .transpose()
1000            })
1001            .collect::<Result<Vec<_>, _>>()?;
1002
1003        let physical_aggr_expr: Vec<Arc<AggregateFunctionExpr>> = hash_agg
1004            .aggr_expr
1005            .iter()
1006            .zip(hash_agg.aggr_expr_name.iter())
1007            .map(|(expr, name)| {
1008                let expr_type = expr.expr_type.as_ref().ok_or_else(|| {
1009                    proto_error("Unexpected empty aggregate physical expression")
1010                })?;
1011
1012                match expr_type {
1013                    ExprType::AggregateExpr(agg_node) => {
1014                        let input_phy_expr: Vec<Arc<dyn PhysicalExpr>> = agg_node
1015                            .expr
1016                            .iter()
1017                            .map(|e| {
1018                                parse_physical_expr(
1019                                    e,
1020                                    ctx,
1021                                    &physical_schema,
1022                                    extension_codec,
1023                                )
1024                            })
1025                            .collect::<Result<Vec<_>>>()?;
1026                        let order_bys = agg_node
1027                            .ordering_req
1028                            .iter()
1029                            .map(|e| {
1030                                parse_physical_sort_expr(
1031                                    e,
1032                                    ctx,
1033                                    &physical_schema,
1034                                    extension_codec,
1035                                )
1036                            })
1037                            .collect::<Result<_>>()?;
1038                        agg_node
1039                            .aggregate_function
1040                            .as_ref()
1041                            .map(|func| match func {
1042                                AggregateFunction::UserDefinedAggrFunction(udaf_name) => {
1043                                    let agg_udf = match &agg_node.fun_definition {
1044                                        Some(buf) => extension_codec
1045                                            .try_decode_udaf(udaf_name, buf)?,
1046                                        None => ctx.udaf(udaf_name).or_else(|_| {
1047                                            extension_codec
1048                                                .try_decode_udaf(udaf_name, &[])
1049                                        })?,
1050                                    };
1051
1052                                    AggregateExprBuilder::new(agg_udf, input_phy_expr)
1053                                        .schema(Arc::clone(&physical_schema))
1054                                        .alias(name)
1055                                        .human_display(agg_node.human_display.clone())
1056                                        .with_ignore_nulls(agg_node.ignore_nulls)
1057                                        .with_distinct(agg_node.distinct)
1058                                        .order_by(order_bys)
1059                                        .build()
1060                                        .map(Arc::new)
1061                                }
1062                            })
1063                            .transpose()?
1064                            .ok_or_else(|| {
1065                                proto_error(
1066                                    "Invalid AggregateExpr, missing aggregate_function",
1067                                )
1068                            })
1069                    }
1070                    _ => internal_err!("Invalid aggregate expression for AggregateExec"),
1071                }
1072            })
1073            .collect::<Result<Vec<_>, _>>()?;
1074
1075        let limit = hash_agg
1076            .limit
1077            .as_ref()
1078            .map(|lit_value| lit_value.limit as usize);
1079
1080        let agg = AggregateExec::try_new(
1081            agg_mode,
1082            PhysicalGroupBy::new(group_expr, null_expr, groups),
1083            physical_aggr_expr,
1084            physical_filter_expr,
1085            input,
1086            physical_schema,
1087        )?;
1088
1089        let agg = agg.with_limit(limit);
1090
1091        Ok(Arc::new(agg))
1092    }
1093
1094    fn try_into_hash_join_physical_plan(
1095        &self,
1096        hashjoin: &protobuf::HashJoinExecNode,
1097        ctx: &TaskContext,
1098
1099        extension_codec: &dyn PhysicalExtensionCodec,
1100    ) -> Result<Arc<dyn ExecutionPlan>> {
1101        let left: Arc<dyn ExecutionPlan> =
1102            into_physical_plan(&hashjoin.left, ctx, extension_codec)?;
1103        let right: Arc<dyn ExecutionPlan> =
1104            into_physical_plan(&hashjoin.right, ctx, extension_codec)?;
1105        let left_schema = left.schema();
1106        let right_schema = right.schema();
1107        let on: Vec<(PhysicalExprRef, PhysicalExprRef)> = hashjoin
1108            .on
1109            .iter()
1110            .map(|col| {
1111                let left = parse_physical_expr(
1112                    &col.left.clone().unwrap(),
1113                    ctx,
1114                    left_schema.as_ref(),
1115                    extension_codec,
1116                )?;
1117                let right = parse_physical_expr(
1118                    &col.right.clone().unwrap(),
1119                    ctx,
1120                    right_schema.as_ref(),
1121                    extension_codec,
1122                )?;
1123                Ok((left, right))
1124            })
1125            .collect::<Result<_>>()?;
1126        let join_type =
1127            protobuf::JoinType::try_from(hashjoin.join_type).map_err(|_| {
1128                proto_error(format!(
1129                    "Received a HashJoinNode message with unknown JoinType {}",
1130                    hashjoin.join_type
1131                ))
1132            })?;
1133        let null_equality = protobuf::NullEquality::try_from(hashjoin.null_equality)
1134            .map_err(|_| {
1135                proto_error(format!(
1136                    "Received a HashJoinNode message with unknown NullEquality {}",
1137                    hashjoin.null_equality
1138                ))
1139            })?;
1140        let filter = hashjoin
1141            .filter
1142            .as_ref()
1143            .map(|f| {
1144                let schema = f
1145                    .schema
1146                    .as_ref()
1147                    .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1148                    .try_into()?;
1149
1150                let expression = parse_physical_expr(
1151                    f.expression.as_ref().ok_or_else(|| {
1152                        proto_error("Unexpected empty filter expression")
1153                    })?,
1154                    ctx, &schema,
1155                    extension_codec,
1156                )?;
1157                let column_indices = f.column_indices
1158                    .iter()
1159                    .map(|i| {
1160                        let side = protobuf::JoinSide::try_from(i.side)
1161                            .map_err(|_| proto_error(format!(
1162                                "Received a HashJoinNode message with JoinSide in Filter {}",
1163                                i.side))
1164                            )?;
1165
1166                        Ok(ColumnIndex {
1167                            index: i.index as usize,
1168                            side: side.into(),
1169                        })
1170                    })
1171                    .collect::<Result<Vec<_>>>()?;
1172
1173                Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1174            })
1175            .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1176
1177        let partition_mode = protobuf::PartitionMode::try_from(hashjoin.partition_mode)
1178            .map_err(|_| {
1179            proto_error(format!(
1180                "Received a HashJoinNode message with unknown PartitionMode {}",
1181                hashjoin.partition_mode
1182            ))
1183        })?;
1184        let partition_mode = match partition_mode {
1185            protobuf::PartitionMode::CollectLeft => PartitionMode::CollectLeft,
1186            protobuf::PartitionMode::Partitioned => PartitionMode::Partitioned,
1187            protobuf::PartitionMode::Auto => PartitionMode::Auto,
1188        };
1189        let projection = if !hashjoin.projection.is_empty() {
1190            Some(
1191                hashjoin
1192                    .projection
1193                    .iter()
1194                    .map(|i| *i as usize)
1195                    .collect::<Vec<_>>(),
1196            )
1197        } else {
1198            None
1199        };
1200        Ok(Arc::new(HashJoinExec::try_new(
1201            left,
1202            right,
1203            on,
1204            filter,
1205            &join_type.into(),
1206            projection,
1207            partition_mode,
1208            null_equality.into(),
1209        )?))
1210    }
1211
1212    fn try_into_symmetric_hash_join_physical_plan(
1213        &self,
1214        sym_join: &protobuf::SymmetricHashJoinExecNode,
1215        ctx: &TaskContext,
1216
1217        extension_codec: &dyn PhysicalExtensionCodec,
1218    ) -> Result<Arc<dyn ExecutionPlan>> {
1219        let left = into_physical_plan(&sym_join.left, ctx, extension_codec)?;
1220        let right = into_physical_plan(&sym_join.right, ctx, extension_codec)?;
1221        let left_schema = left.schema();
1222        let right_schema = right.schema();
1223        let on = sym_join
1224            .on
1225            .iter()
1226            .map(|col| {
1227                let left = parse_physical_expr(
1228                    &col.left.clone().unwrap(),
1229                    ctx,
1230                    left_schema.as_ref(),
1231                    extension_codec,
1232                )?;
1233                let right = parse_physical_expr(
1234                    &col.right.clone().unwrap(),
1235                    ctx,
1236                    right_schema.as_ref(),
1237                    extension_codec,
1238                )?;
1239                Ok((left, right))
1240            })
1241            .collect::<Result<_>>()?;
1242        let join_type =
1243            protobuf::JoinType::try_from(sym_join.join_type).map_err(|_| {
1244                proto_error(format!(
1245                    "Received a SymmetricHashJoin message with unknown JoinType {}",
1246                    sym_join.join_type
1247                ))
1248            })?;
1249        let null_equality = protobuf::NullEquality::try_from(sym_join.null_equality)
1250            .map_err(|_| {
1251                proto_error(format!(
1252                    "Received a SymmetricHashJoin message with unknown NullEquality {}",
1253                    sym_join.null_equality
1254                ))
1255            })?;
1256        let filter = sym_join
1257            .filter
1258            .as_ref()
1259            .map(|f| {
1260                let schema = f
1261                    .schema
1262                    .as_ref()
1263                    .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1264                    .try_into()?;
1265
1266                let expression = parse_physical_expr(
1267                    f.expression.as_ref().ok_or_else(|| {
1268                        proto_error("Unexpected empty filter expression")
1269                    })?,
1270                    ctx, &schema,
1271                    extension_codec,
1272                )?;
1273                let column_indices = f.column_indices
1274                    .iter()
1275                    .map(|i| {
1276                        let side = protobuf::JoinSide::try_from(i.side)
1277                            .map_err(|_| proto_error(format!(
1278                                "Received a HashJoinNode message with JoinSide in Filter {}",
1279                                i.side))
1280                            )?;
1281
1282                        Ok(ColumnIndex {
1283                            index: i.index as usize,
1284                            side: side.into(),
1285                        })
1286                    })
1287                    .collect::<Result<_>>()?;
1288
1289                Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1290            })
1291            .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1292
1293        let left_sort_exprs = parse_physical_sort_exprs(
1294            &sym_join.left_sort_exprs,
1295            ctx,
1296            &left_schema,
1297            extension_codec,
1298        )?;
1299        let left_sort_exprs = LexOrdering::new(left_sort_exprs);
1300
1301        let right_sort_exprs = parse_physical_sort_exprs(
1302            &sym_join.right_sort_exprs,
1303            ctx,
1304            &right_schema,
1305            extension_codec,
1306        )?;
1307        let right_sort_exprs = LexOrdering::new(right_sort_exprs);
1308
1309        let partition_mode = protobuf::StreamPartitionMode::try_from(
1310            sym_join.partition_mode,
1311        )
1312        .map_err(|_| {
1313            proto_error(format!(
1314                "Received a SymmetricHashJoin message with unknown PartitionMode {}",
1315                sym_join.partition_mode
1316            ))
1317        })?;
1318        let partition_mode = match partition_mode {
1319            protobuf::StreamPartitionMode::SinglePartition => {
1320                StreamJoinPartitionMode::SinglePartition
1321            }
1322            protobuf::StreamPartitionMode::PartitionedExec => {
1323                StreamJoinPartitionMode::Partitioned
1324            }
1325        };
1326        SymmetricHashJoinExec::try_new(
1327            left,
1328            right,
1329            on,
1330            filter,
1331            &join_type.into(),
1332            null_equality.into(),
1333            left_sort_exprs,
1334            right_sort_exprs,
1335            partition_mode,
1336        )
1337        .map(|e| Arc::new(e) as _)
1338    }
1339
1340    fn try_into_union_physical_plan(
1341        &self,
1342        union: &protobuf::UnionExecNode,
1343        ctx: &TaskContext,
1344
1345        extension_codec: &dyn PhysicalExtensionCodec,
1346    ) -> Result<Arc<dyn ExecutionPlan>> {
1347        let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1348        for input in &union.inputs {
1349            inputs.push(input.try_into_physical_plan(ctx, extension_codec)?);
1350        }
1351        UnionExec::try_new(inputs)
1352    }
1353
1354    fn try_into_interleave_physical_plan(
1355        &self,
1356        interleave: &protobuf::InterleaveExecNode,
1357        ctx: &TaskContext,
1358
1359        extension_codec: &dyn PhysicalExtensionCodec,
1360    ) -> Result<Arc<dyn ExecutionPlan>> {
1361        let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1362        for input in &interleave.inputs {
1363            inputs.push(input.try_into_physical_plan(ctx, extension_codec)?);
1364        }
1365        Ok(Arc::new(InterleaveExec::try_new(inputs)?))
1366    }
1367
1368    fn try_into_cross_join_physical_plan(
1369        &self,
1370        crossjoin: &protobuf::CrossJoinExecNode,
1371        ctx: &TaskContext,
1372
1373        extension_codec: &dyn PhysicalExtensionCodec,
1374    ) -> Result<Arc<dyn ExecutionPlan>> {
1375        let left: Arc<dyn ExecutionPlan> =
1376            into_physical_plan(&crossjoin.left, ctx, extension_codec)?;
1377        let right: Arc<dyn ExecutionPlan> =
1378            into_physical_plan(&crossjoin.right, ctx, extension_codec)?;
1379        Ok(Arc::new(CrossJoinExec::new(left, right)))
1380    }
1381
1382    fn try_into_empty_physical_plan(
1383        &self,
1384        empty: &protobuf::EmptyExecNode,
1385        _ctx: &TaskContext,
1386
1387        _extension_codec: &dyn PhysicalExtensionCodec,
1388    ) -> Result<Arc<dyn ExecutionPlan>> {
1389        let schema = Arc::new(convert_required!(empty.schema)?);
1390        Ok(Arc::new(EmptyExec::new(schema)))
1391    }
1392
1393    fn try_into_placeholder_row_physical_plan(
1394        &self,
1395        placeholder: &protobuf::PlaceholderRowExecNode,
1396        _ctx: &TaskContext,
1397
1398        _extension_codec: &dyn PhysicalExtensionCodec,
1399    ) -> Result<Arc<dyn ExecutionPlan>> {
1400        let schema = Arc::new(convert_required!(placeholder.schema)?);
1401        Ok(Arc::new(PlaceholderRowExec::new(schema)))
1402    }
1403
1404    fn try_into_sort_physical_plan(
1405        &self,
1406        sort: &protobuf::SortExecNode,
1407        ctx: &TaskContext,
1408
1409        extension_codec: &dyn PhysicalExtensionCodec,
1410    ) -> Result<Arc<dyn ExecutionPlan>> {
1411        let input = into_physical_plan(&sort.input, ctx, extension_codec)?;
1412        let exprs = sort
1413            .expr
1414            .iter()
1415            .map(|expr| {
1416                let expr = expr.expr_type.as_ref().ok_or_else(|| {
1417                    proto_error(format!(
1418                        "physical_plan::from_proto() Unexpected expr {self:?}"
1419                    ))
1420                })?;
1421                if let ExprType::Sort(sort_expr) = expr {
1422                    let expr = sort_expr
1423                        .expr
1424                        .as_ref()
1425                        .ok_or_else(|| {
1426                            proto_error(format!(
1427                                "physical_plan::from_proto() Unexpected sort expr {self:?}"
1428                            ))
1429                        })?
1430                        .as_ref();
1431                    Ok(PhysicalSortExpr {
1432                        expr: parse_physical_expr(expr, ctx, input.schema().as_ref(), extension_codec)?,
1433                        options: SortOptions {
1434                            descending: !sort_expr.asc,
1435                            nulls_first: sort_expr.nulls_first,
1436                        },
1437                    })
1438                } else {
1439                    internal_err!(
1440                        "physical_plan::from_proto() {self:?}"
1441                    )
1442                }
1443            })
1444            .collect::<Result<Vec<_>>>()?;
1445        let Some(ordering) = LexOrdering::new(exprs) else {
1446            return internal_err!("SortExec requires an ordering");
1447        };
1448        let fetch = (sort.fetch >= 0).then_some(sort.fetch as _);
1449        let new_sort = SortExec::new(ordering, input)
1450            .with_fetch(fetch)
1451            .with_preserve_partitioning(sort.preserve_partitioning);
1452
1453        Ok(Arc::new(new_sort))
1454    }
1455
1456    fn try_into_sort_preserving_merge_physical_plan(
1457        &self,
1458        sort: &protobuf::SortPreservingMergeExecNode,
1459        ctx: &TaskContext,
1460
1461        extension_codec: &dyn PhysicalExtensionCodec,
1462    ) -> Result<Arc<dyn ExecutionPlan>> {
1463        let input = into_physical_plan(&sort.input, ctx, extension_codec)?;
1464        let exprs = sort
1465            .expr
1466            .iter()
1467            .map(|expr| {
1468                let expr = expr.expr_type.as_ref().ok_or_else(|| {
1469                    proto_error(format!(
1470                        "physical_plan::from_proto() Unexpected expr {self:?}"
1471                    ))
1472                })?;
1473                if let ExprType::Sort(sort_expr) = expr {
1474                    let expr = sort_expr
1475                        .expr
1476                        .as_ref()
1477                        .ok_or_else(|| {
1478                            proto_error(format!(
1479                            "physical_plan::from_proto() Unexpected sort expr {self:?}"
1480                        ))
1481                        })?
1482                        .as_ref();
1483                    Ok(PhysicalSortExpr {
1484                        expr: parse_physical_expr(
1485                            expr,
1486                            ctx,
1487                            input.schema().as_ref(),
1488                            extension_codec,
1489                        )?,
1490                        options: SortOptions {
1491                            descending: !sort_expr.asc,
1492                            nulls_first: sort_expr.nulls_first,
1493                        },
1494                    })
1495                } else {
1496                    internal_err!("physical_plan::from_proto() {self:?}")
1497                }
1498            })
1499            .collect::<Result<Vec<_>>>()?;
1500        let Some(ordering) = LexOrdering::new(exprs) else {
1501            return internal_err!("SortExec requires an ordering");
1502        };
1503        let fetch = (sort.fetch >= 0).then_some(sort.fetch as _);
1504        Ok(Arc::new(
1505            SortPreservingMergeExec::new(ordering, input).with_fetch(fetch),
1506        ))
1507    }
1508
1509    fn try_into_extension_physical_plan(
1510        &self,
1511        extension: &protobuf::PhysicalExtensionNode,
1512        ctx: &TaskContext,
1513
1514        extension_codec: &dyn PhysicalExtensionCodec,
1515    ) -> Result<Arc<dyn ExecutionPlan>> {
1516        let inputs: Vec<Arc<dyn ExecutionPlan>> = extension
1517            .inputs
1518            .iter()
1519            .map(|i| i.try_into_physical_plan(ctx, extension_codec))
1520            .collect::<Result<_>>()?;
1521
1522        let extension_node =
1523            extension_codec.try_decode(extension.node.as_slice(), &inputs, ctx)?;
1524
1525        Ok(extension_node)
1526    }
1527
1528    fn try_into_nested_loop_join_physical_plan(
1529        &self,
1530        join: &protobuf::NestedLoopJoinExecNode,
1531        ctx: &TaskContext,
1532
1533        extension_codec: &dyn PhysicalExtensionCodec,
1534    ) -> Result<Arc<dyn ExecutionPlan>> {
1535        let left: Arc<dyn ExecutionPlan> =
1536            into_physical_plan(&join.left, ctx, extension_codec)?;
1537        let right: Arc<dyn ExecutionPlan> =
1538            into_physical_plan(&join.right, ctx, extension_codec)?;
1539        let join_type = protobuf::JoinType::try_from(join.join_type).map_err(|_| {
1540            proto_error(format!(
1541                "Received a NestedLoopJoinExecNode message with unknown JoinType {}",
1542                join.join_type
1543            ))
1544        })?;
1545        let filter = join
1546                    .filter
1547                    .as_ref()
1548                    .map(|f| {
1549                        let schema = f
1550                            .schema
1551                            .as_ref()
1552                            .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1553                            .try_into()?;
1554
1555                        let expression = parse_physical_expr(
1556                            f.expression.as_ref().ok_or_else(|| {
1557                                proto_error("Unexpected empty filter expression")
1558                            })?,
1559                            ctx, &schema,
1560                            extension_codec,
1561                        )?;
1562                        let column_indices = f.column_indices
1563                            .iter()
1564                            .map(|i| {
1565                                let side = protobuf::JoinSide::try_from(i.side)
1566                                    .map_err(|_| proto_error(format!(
1567                                        "Received a NestedLoopJoinExecNode message with JoinSide in Filter {}",
1568                                        i.side))
1569                                    )?;
1570
1571                                Ok(ColumnIndex {
1572                                    index: i.index as usize,
1573                                    side: side.into(),
1574                                })
1575                            })
1576                            .collect::<Result<Vec<_>>>()?;
1577
1578                        Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1579                    })
1580                    .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1581
1582        let projection = if !join.projection.is_empty() {
1583            Some(
1584                join.projection
1585                    .iter()
1586                    .map(|i| *i as usize)
1587                    .collect::<Vec<_>>(),
1588            )
1589        } else {
1590            None
1591        };
1592
1593        Ok(Arc::new(NestedLoopJoinExec::try_new(
1594            left,
1595            right,
1596            filter,
1597            &join_type.into(),
1598            projection,
1599        )?))
1600    }
1601
1602    fn try_into_analyze_physical_plan(
1603        &self,
1604        analyze: &protobuf::AnalyzeExecNode,
1605        ctx: &TaskContext,
1606
1607        extension_codec: &dyn PhysicalExtensionCodec,
1608    ) -> Result<Arc<dyn ExecutionPlan>> {
1609        let input: Arc<dyn ExecutionPlan> =
1610            into_physical_plan(&analyze.input, ctx, extension_codec)?;
1611        Ok(Arc::new(AnalyzeExec::new(
1612            analyze.verbose,
1613            analyze.show_statistics,
1614            vec![MetricType::SUMMARY, MetricType::DEV],
1615            input,
1616            Arc::new(convert_required!(analyze.schema)?),
1617        )))
1618    }
1619
1620    fn try_into_json_sink_physical_plan(
1621        &self,
1622        sink: &protobuf::JsonSinkExecNode,
1623        ctx: &TaskContext,
1624
1625        extension_codec: &dyn PhysicalExtensionCodec,
1626    ) -> Result<Arc<dyn ExecutionPlan>> {
1627        let input = into_physical_plan(&sink.input, ctx, extension_codec)?;
1628
1629        let data_sink: JsonSink = sink
1630            .sink
1631            .as_ref()
1632            .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1633            .try_into()?;
1634        let sink_schema = input.schema();
1635        let sort_order = sink
1636            .sort_order
1637            .as_ref()
1638            .map(|collection| {
1639                parse_physical_sort_exprs(
1640                    &collection.physical_sort_expr_nodes,
1641                    ctx,
1642                    &sink_schema,
1643                    extension_codec,
1644                )
1645                .map(|sort_exprs| {
1646                    LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1647                })
1648            })
1649            .transpose()?
1650            .flatten();
1651        Ok(Arc::new(DataSinkExec::new(
1652            input,
1653            Arc::new(data_sink),
1654            sort_order,
1655        )))
1656    }
1657
1658    fn try_into_csv_sink_physical_plan(
1659        &self,
1660        sink: &protobuf::CsvSinkExecNode,
1661        ctx: &TaskContext,
1662
1663        extension_codec: &dyn PhysicalExtensionCodec,
1664    ) -> Result<Arc<dyn ExecutionPlan>> {
1665        let input = into_physical_plan(&sink.input, ctx, extension_codec)?;
1666
1667        let data_sink: CsvSink = sink
1668            .sink
1669            .as_ref()
1670            .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1671            .try_into()?;
1672        let sink_schema = input.schema();
1673        let sort_order = sink
1674            .sort_order
1675            .as_ref()
1676            .map(|collection| {
1677                parse_physical_sort_exprs(
1678                    &collection.physical_sort_expr_nodes,
1679                    ctx,
1680                    &sink_schema,
1681                    extension_codec,
1682                )
1683                .map(|sort_exprs| {
1684                    LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1685                })
1686            })
1687            .transpose()?
1688            .flatten();
1689        Ok(Arc::new(DataSinkExec::new(
1690            input,
1691            Arc::new(data_sink),
1692            sort_order,
1693        )))
1694    }
1695
1696    #[cfg_attr(not(feature = "parquet"), expect(unused_variables))]
1697    fn try_into_parquet_sink_physical_plan(
1698        &self,
1699        sink: &protobuf::ParquetSinkExecNode,
1700        ctx: &TaskContext,
1701
1702        extension_codec: &dyn PhysicalExtensionCodec,
1703    ) -> Result<Arc<dyn ExecutionPlan>> {
1704        #[cfg(feature = "parquet")]
1705        {
1706            let input = into_physical_plan(&sink.input, ctx, extension_codec)?;
1707
1708            let data_sink: ParquetSink = sink
1709                .sink
1710                .as_ref()
1711                .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1712                .try_into()?;
1713            let sink_schema = input.schema();
1714            let sort_order = sink
1715                .sort_order
1716                .as_ref()
1717                .map(|collection| {
1718                    parse_physical_sort_exprs(
1719                        &collection.physical_sort_expr_nodes,
1720                        ctx,
1721                        &sink_schema,
1722                        extension_codec,
1723                    )
1724                    .map(|sort_exprs| {
1725                        LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1726                    })
1727                })
1728                .transpose()?
1729                .flatten();
1730            Ok(Arc::new(DataSinkExec::new(
1731                input,
1732                Arc::new(data_sink),
1733                sort_order,
1734            )))
1735        }
1736        #[cfg(not(feature = "parquet"))]
1737        panic!("Trying to use ParquetSink without `parquet` feature enabled");
1738    }
1739
1740    fn try_into_unnest_physical_plan(
1741        &self,
1742        unnest: &protobuf::UnnestExecNode,
1743        ctx: &TaskContext,
1744
1745        extension_codec: &dyn PhysicalExtensionCodec,
1746    ) -> Result<Arc<dyn ExecutionPlan>> {
1747        let input = into_physical_plan(&unnest.input, ctx, extension_codec)?;
1748
1749        Ok(Arc::new(UnnestExec::new(
1750            input,
1751            unnest
1752                .list_type_columns
1753                .iter()
1754                .map(|c| ListUnnest {
1755                    index_in_input_schema: c.index_in_input_schema as _,
1756                    depth: c.depth as _,
1757                })
1758                .collect(),
1759            unnest.struct_type_columns.iter().map(|c| *c as _).collect(),
1760            Arc::new(convert_required!(unnest.schema)?),
1761            into_required!(unnest.options)?,
1762        )?))
1763    }
1764
1765    fn generate_series_name_to_str(name: protobuf::GenerateSeriesName) -> &'static str {
1766        match name {
1767            protobuf::GenerateSeriesName::GsGenerateSeries => "generate_series",
1768            protobuf::GenerateSeriesName::GsRange => "range",
1769        }
1770    }
1771    fn try_into_sort_join(
1772        &self,
1773        sort_join: &SortMergeJoinExecNode,
1774        ctx: &TaskContext,
1775
1776        extension_codec: &dyn PhysicalExtensionCodec,
1777    ) -> Result<Arc<dyn ExecutionPlan>> {
1778        let left = into_physical_plan(&sort_join.left, ctx, extension_codec)?;
1779        let left_schema = left.schema();
1780        let right = into_physical_plan(&sort_join.right, ctx, extension_codec)?;
1781        let right_schema = right.schema();
1782
1783        let filter = sort_join
1784            .filter
1785            .as_ref()
1786            .map(|f| {
1787                let schema = f
1788                    .schema
1789                    .as_ref()
1790                    .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1791                    .try_into()?;
1792
1793                let expression = parse_physical_expr(
1794                    f.expression.as_ref().ok_or_else(|| {
1795                        proto_error("Unexpected empty filter expression")
1796                    })?,
1797                    ctx,
1798                    &schema,
1799                    extension_codec,
1800                )?;
1801                let column_indices = f
1802                    .column_indices
1803                    .iter()
1804                    .map(|i| {
1805                        let side =
1806                            protobuf::JoinSide::try_from(i.side).map_err(|_| {
1807                                proto_error(format!(
1808                                    "Received a SortMergeJoinExecNode message with JoinSide in Filter {}",
1809                                    i.side
1810                                ))
1811                            })?;
1812
1813                        Ok(ColumnIndex {
1814                            index: i.index as usize,
1815                            side: side.into(),
1816                        })
1817                    })
1818                    .collect::<Result<Vec<_>>>()?;
1819
1820                Ok(JoinFilter::new(
1821                    expression,
1822                    column_indices,
1823                    Arc::new(schema),
1824                ))
1825            })
1826            .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1827
1828        let join_type =
1829            protobuf::JoinType::try_from(sort_join.join_type).map_err(|_| {
1830                proto_error(format!(
1831                    "Received a SortMergeJoinExecNode message with unknown JoinType {}",
1832                    sort_join.join_type
1833                ))
1834            })?;
1835
1836        let null_equality = protobuf::NullEquality::try_from(sort_join.null_equality)
1837            .map_err(|_| {
1838                proto_error(format!(
1839                    "Received a SortMergeJoinExecNode message with unknown NullEquality {}",
1840                    sort_join.null_equality
1841                ))
1842            })?;
1843
1844        let sort_options = sort_join
1845            .sort_options
1846            .iter()
1847            .map(|e| SortOptions {
1848                descending: !e.asc,
1849                nulls_first: e.nulls_first,
1850            })
1851            .collect();
1852        let on = sort_join
1853            .on
1854            .iter()
1855            .map(|col| {
1856                let left = parse_physical_expr(
1857                    &col.left.clone().unwrap(),
1858                    ctx,
1859                    left_schema.as_ref(),
1860                    extension_codec,
1861                )?;
1862                let right = parse_physical_expr(
1863                    &col.right.clone().unwrap(),
1864                    ctx,
1865                    right_schema.as_ref(),
1866                    extension_codec,
1867                )?;
1868                Ok((left, right))
1869            })
1870            .collect::<Result<_>>()?;
1871
1872        Ok(Arc::new(SortMergeJoinExec::try_new(
1873            left,
1874            right,
1875            on,
1876            filter,
1877            join_type.into(),
1878            sort_options,
1879            null_equality.into(),
1880        )?))
1881    }
1882
1883    fn try_into_generate_series_physical_plan(
1884        &self,
1885        generate_series: &protobuf::GenerateSeriesNode,
1886    ) -> Result<Arc<dyn ExecutionPlan>> {
1887        let schema: SchemaRef = Arc::new(convert_required!(generate_series.schema)?);
1888
1889        let args = match &generate_series.args {
1890            Some(protobuf::generate_series_node::Args::ContainsNull(args)) => {
1891                GenSeriesArgs::ContainsNull {
1892                    name: Self::generate_series_name_to_str(args.name()),
1893                }
1894            }
1895            Some(protobuf::generate_series_node::Args::Int64Args(args)) => {
1896                GenSeriesArgs::Int64Args {
1897                    start: args.start,
1898                    end: args.end,
1899                    step: args.step,
1900                    include_end: args.include_end,
1901                    name: Self::generate_series_name_to_str(args.name()),
1902                }
1903            }
1904            Some(protobuf::generate_series_node::Args::TimestampArgs(args)) => {
1905                let step_proto = args.step.as_ref().ok_or_else(|| {
1906                    internal_datafusion_err!("Missing step in TimestampArgs")
1907                })?;
1908                let step = IntervalMonthDayNanoType::make_value(
1909                    step_proto.months,
1910                    step_proto.days,
1911                    step_proto.nanos,
1912                );
1913                GenSeriesArgs::TimestampArgs {
1914                    start: args.start,
1915                    end: args.end,
1916                    step,
1917                    tz: args.tz.as_ref().map(|s| Arc::from(s.as_str())),
1918                    include_end: args.include_end,
1919                    name: Self::generate_series_name_to_str(args.name()),
1920                }
1921            }
1922            Some(protobuf::generate_series_node::Args::DateArgs(args)) => {
1923                let step_proto = args.step.as_ref().ok_or_else(|| {
1924                    internal_datafusion_err!("Missing step in DateArgs")
1925                })?;
1926                let step = IntervalMonthDayNanoType::make_value(
1927                    step_proto.months,
1928                    step_proto.days,
1929                    step_proto.nanos,
1930                );
1931                GenSeriesArgs::DateArgs {
1932                    start: args.start,
1933                    end: args.end,
1934                    step,
1935                    include_end: args.include_end,
1936                    name: Self::generate_series_name_to_str(args.name()),
1937                }
1938            }
1939            None => return internal_err!("Missing args in GenerateSeriesNode"),
1940        };
1941
1942        let table = GenerateSeriesTable::new(Arc::clone(&schema), args);
1943        let generator = table.as_generator(generate_series.target_batch_size as usize)?;
1944
1945        Ok(Arc::new(LazyMemoryExec::try_new(schema, vec![generator])?))
1946    }
1947
1948    fn try_into_cooperative_physical_plan(
1949        &self,
1950        field_stream: &protobuf::CooperativeExecNode,
1951        ctx: &TaskContext,
1952
1953        extension_codec: &dyn PhysicalExtensionCodec,
1954    ) -> Result<Arc<dyn ExecutionPlan>> {
1955        let input = into_physical_plan(&field_stream.input, ctx, extension_codec)?;
1956        Ok(Arc::new(CooperativeExec::new(input)))
1957    }
1958
1959    fn try_from_explain_exec(
1960        exec: &ExplainExec,
1961        _extension_codec: &dyn PhysicalExtensionCodec,
1962    ) -> Result<Self> {
1963        Ok(protobuf::PhysicalPlanNode {
1964            physical_plan_type: Some(PhysicalPlanType::Explain(
1965                protobuf::ExplainExecNode {
1966                    schema: Some(exec.schema().as_ref().try_into()?),
1967                    stringified_plans: exec
1968                        .stringified_plans()
1969                        .iter()
1970                        .map(|plan| plan.into())
1971                        .collect(),
1972                    verbose: exec.verbose(),
1973                },
1974            )),
1975        })
1976    }
1977
1978    fn try_from_projection_exec(
1979        exec: &ProjectionExec,
1980        extension_codec: &dyn PhysicalExtensionCodec,
1981    ) -> Result<Self> {
1982        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1983            exec.input().to_owned(),
1984            extension_codec,
1985        )?;
1986        let expr = exec
1987            .expr()
1988            .iter()
1989            .map(|proj_expr| serialize_physical_expr(&proj_expr.expr, extension_codec))
1990            .collect::<Result<Vec<_>>>()?;
1991        let expr_name = exec
1992            .expr()
1993            .iter()
1994            .map(|proj_expr| proj_expr.alias.clone())
1995            .collect();
1996        Ok(protobuf::PhysicalPlanNode {
1997            physical_plan_type: Some(PhysicalPlanType::Projection(Box::new(
1998                protobuf::ProjectionExecNode {
1999                    input: Some(Box::new(input)),
2000                    expr,
2001                    expr_name,
2002                },
2003            ))),
2004        })
2005    }
2006
2007    fn try_from_analyze_exec(
2008        exec: &AnalyzeExec,
2009        extension_codec: &dyn PhysicalExtensionCodec,
2010    ) -> Result<Self> {
2011        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2012            exec.input().to_owned(),
2013            extension_codec,
2014        )?;
2015        Ok(protobuf::PhysicalPlanNode {
2016            physical_plan_type: Some(PhysicalPlanType::Analyze(Box::new(
2017                protobuf::AnalyzeExecNode {
2018                    verbose: exec.verbose(),
2019                    show_statistics: exec.show_statistics(),
2020                    input: Some(Box::new(input)),
2021                    schema: Some(exec.schema().as_ref().try_into()?),
2022                },
2023            ))),
2024        })
2025    }
2026
2027    fn try_from_filter_exec(
2028        exec: &FilterExec,
2029        extension_codec: &dyn PhysicalExtensionCodec,
2030    ) -> Result<Self> {
2031        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2032            exec.input().to_owned(),
2033            extension_codec,
2034        )?;
2035        Ok(protobuf::PhysicalPlanNode {
2036            physical_plan_type: Some(PhysicalPlanType::Filter(Box::new(
2037                protobuf::FilterExecNode {
2038                    input: Some(Box::new(input)),
2039                    expr: Some(serialize_physical_expr(
2040                        exec.predicate(),
2041                        extension_codec,
2042                    )?),
2043                    default_filter_selectivity: exec.default_selectivity() as u32,
2044                    projection: exec.projection().as_ref().map_or_else(Vec::new, |v| {
2045                        v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2046                    }),
2047                },
2048            ))),
2049        })
2050    }
2051
2052    fn try_from_global_limit_exec(
2053        limit: &GlobalLimitExec,
2054        extension_codec: &dyn PhysicalExtensionCodec,
2055    ) -> Result<Self> {
2056        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2057            limit.input().to_owned(),
2058            extension_codec,
2059        )?;
2060
2061        Ok(protobuf::PhysicalPlanNode {
2062            physical_plan_type: Some(PhysicalPlanType::GlobalLimit(Box::new(
2063                protobuf::GlobalLimitExecNode {
2064                    input: Some(Box::new(input)),
2065                    skip: limit.skip() as u32,
2066                    fetch: match limit.fetch() {
2067                        Some(n) => n as i64,
2068                        _ => -1, // no limit
2069                    },
2070                },
2071            ))),
2072        })
2073    }
2074
2075    fn try_from_local_limit_exec(
2076        limit: &LocalLimitExec,
2077        extension_codec: &dyn PhysicalExtensionCodec,
2078    ) -> Result<Self> {
2079        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2080            limit.input().to_owned(),
2081            extension_codec,
2082        )?;
2083        Ok(protobuf::PhysicalPlanNode {
2084            physical_plan_type: Some(PhysicalPlanType::LocalLimit(Box::new(
2085                protobuf::LocalLimitExecNode {
2086                    input: Some(Box::new(input)),
2087                    fetch: limit.fetch() as u32,
2088                },
2089            ))),
2090        })
2091    }
2092
2093    fn try_from_hash_join_exec(
2094        exec: &HashJoinExec,
2095        extension_codec: &dyn PhysicalExtensionCodec,
2096    ) -> Result<Self> {
2097        let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2098            exec.left().to_owned(),
2099            extension_codec,
2100        )?;
2101        let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2102            exec.right().to_owned(),
2103            extension_codec,
2104        )?;
2105        let on: Vec<protobuf::JoinOn> = exec
2106            .on()
2107            .iter()
2108            .map(|tuple| {
2109                let l = serialize_physical_expr(&tuple.0, extension_codec)?;
2110                let r = serialize_physical_expr(&tuple.1, extension_codec)?;
2111                Ok::<_, DataFusionError>(protobuf::JoinOn {
2112                    left: Some(l),
2113                    right: Some(r),
2114                })
2115            })
2116            .collect::<Result<_>>()?;
2117        let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2118        let null_equality: protobuf::NullEquality = exec.null_equality().into();
2119        let filter = exec
2120            .filter()
2121            .as_ref()
2122            .map(|f| {
2123                let expression =
2124                    serialize_physical_expr(f.expression(), extension_codec)?;
2125                let column_indices = f
2126                    .column_indices()
2127                    .iter()
2128                    .map(|i| {
2129                        let side: protobuf::JoinSide = i.side.to_owned().into();
2130                        protobuf::ColumnIndex {
2131                            index: i.index as u32,
2132                            side: side.into(),
2133                        }
2134                    })
2135                    .collect();
2136                let schema = f.schema().as_ref().try_into()?;
2137                Ok(protobuf::JoinFilter {
2138                    expression: Some(expression),
2139                    column_indices,
2140                    schema: Some(schema),
2141                })
2142            })
2143            .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2144
2145        let partition_mode = match exec.partition_mode() {
2146            PartitionMode::CollectLeft => protobuf::PartitionMode::CollectLeft,
2147            PartitionMode::Partitioned => protobuf::PartitionMode::Partitioned,
2148            PartitionMode::Auto => protobuf::PartitionMode::Auto,
2149        };
2150
2151        Ok(protobuf::PhysicalPlanNode {
2152            physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new(
2153                protobuf::HashJoinExecNode {
2154                    left: Some(Box::new(left)),
2155                    right: Some(Box::new(right)),
2156                    on,
2157                    join_type: join_type.into(),
2158                    partition_mode: partition_mode.into(),
2159                    null_equality: null_equality.into(),
2160                    filter,
2161                    projection: exec.projection.as_ref().map_or_else(Vec::new, |v| {
2162                        v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2163                    }),
2164                },
2165            ))),
2166        })
2167    }
2168
2169    fn try_from_symmetric_hash_join_exec(
2170        exec: &SymmetricHashJoinExec,
2171        extension_codec: &dyn PhysicalExtensionCodec,
2172    ) -> Result<Self> {
2173        let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2174            exec.left().to_owned(),
2175            extension_codec,
2176        )?;
2177        let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2178            exec.right().to_owned(),
2179            extension_codec,
2180        )?;
2181        let on = exec
2182            .on()
2183            .iter()
2184            .map(|tuple| {
2185                let l = serialize_physical_expr(&tuple.0, extension_codec)?;
2186                let r = serialize_physical_expr(&tuple.1, extension_codec)?;
2187                Ok::<_, DataFusionError>(protobuf::JoinOn {
2188                    left: Some(l),
2189                    right: Some(r),
2190                })
2191            })
2192            .collect::<Result<_>>()?;
2193        let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2194        let null_equality: protobuf::NullEquality = exec.null_equality().into();
2195        let filter = exec
2196            .filter()
2197            .as_ref()
2198            .map(|f| {
2199                let expression =
2200                    serialize_physical_expr(f.expression(), extension_codec)?;
2201                let column_indices = f
2202                    .column_indices()
2203                    .iter()
2204                    .map(|i| {
2205                        let side: protobuf::JoinSide = i.side.to_owned().into();
2206                        protobuf::ColumnIndex {
2207                            index: i.index as u32,
2208                            side: side.into(),
2209                        }
2210                    })
2211                    .collect();
2212                let schema = f.schema().as_ref().try_into()?;
2213                Ok(protobuf::JoinFilter {
2214                    expression: Some(expression),
2215                    column_indices,
2216                    schema: Some(schema),
2217                })
2218            })
2219            .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2220
2221        let partition_mode = match exec.partition_mode() {
2222            StreamJoinPartitionMode::SinglePartition => {
2223                protobuf::StreamPartitionMode::SinglePartition
2224            }
2225            StreamJoinPartitionMode::Partitioned => {
2226                protobuf::StreamPartitionMode::PartitionedExec
2227            }
2228        };
2229
2230        let left_sort_exprs = exec
2231            .left_sort_exprs()
2232            .map(|exprs| {
2233                exprs
2234                    .iter()
2235                    .map(|expr| {
2236                        Ok(protobuf::PhysicalSortExprNode {
2237                            expr: Some(Box::new(serialize_physical_expr(
2238                                &expr.expr,
2239                                extension_codec,
2240                            )?)),
2241                            asc: !expr.options.descending,
2242                            nulls_first: expr.options.nulls_first,
2243                        })
2244                    })
2245                    .collect::<Result<Vec<_>>>()
2246            })
2247            .transpose()?
2248            .unwrap_or(vec![]);
2249
2250        let right_sort_exprs = exec
2251            .right_sort_exprs()
2252            .map(|exprs| {
2253                exprs
2254                    .iter()
2255                    .map(|expr| {
2256                        Ok(protobuf::PhysicalSortExprNode {
2257                            expr: Some(Box::new(serialize_physical_expr(
2258                                &expr.expr,
2259                                extension_codec,
2260                            )?)),
2261                            asc: !expr.options.descending,
2262                            nulls_first: expr.options.nulls_first,
2263                        })
2264                    })
2265                    .collect::<Result<Vec<_>>>()
2266            })
2267            .transpose()?
2268            .unwrap_or(vec![]);
2269
2270        Ok(protobuf::PhysicalPlanNode {
2271            physical_plan_type: Some(PhysicalPlanType::SymmetricHashJoin(Box::new(
2272                protobuf::SymmetricHashJoinExecNode {
2273                    left: Some(Box::new(left)),
2274                    right: Some(Box::new(right)),
2275                    on,
2276                    join_type: join_type.into(),
2277                    partition_mode: partition_mode.into(),
2278                    null_equality: null_equality.into(),
2279                    left_sort_exprs,
2280                    right_sort_exprs,
2281                    filter,
2282                },
2283            ))),
2284        })
2285    }
2286
2287    fn try_from_sort_merge_join_exec(
2288        exec: &SortMergeJoinExec,
2289        extension_codec: &dyn PhysicalExtensionCodec,
2290    ) -> Result<Self> {
2291        let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2292            exec.left().to_owned(),
2293            extension_codec,
2294        )?;
2295        let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2296            exec.right().to_owned(),
2297            extension_codec,
2298        )?;
2299        let on = exec
2300            .on()
2301            .iter()
2302            .map(|tuple| {
2303                let l = serialize_physical_expr(&tuple.0, extension_codec)?;
2304                let r = serialize_physical_expr(&tuple.1, extension_codec)?;
2305                Ok::<_, DataFusionError>(protobuf::JoinOn {
2306                    left: Some(l),
2307                    right: Some(r),
2308                })
2309            })
2310            .collect::<Result<_>>()?;
2311        let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2312        let null_equality: protobuf::NullEquality = exec.null_equality().into();
2313        let filter = exec
2314            .filter()
2315            .as_ref()
2316            .map(|f| {
2317                let expression =
2318                    serialize_physical_expr(f.expression(), extension_codec)?;
2319                let column_indices = f
2320                    .column_indices()
2321                    .iter()
2322                    .map(|i| {
2323                        let side: protobuf::JoinSide = i.side.to_owned().into();
2324                        protobuf::ColumnIndex {
2325                            index: i.index as u32,
2326                            side: side.into(),
2327                        }
2328                    })
2329                    .collect();
2330                let schema = f.schema().as_ref().try_into()?;
2331                Ok(protobuf::JoinFilter {
2332                    expression: Some(expression),
2333                    column_indices,
2334                    schema: Some(schema),
2335                })
2336            })
2337            .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2338
2339        let sort_options = exec
2340            .sort_options()
2341            .iter()
2342            .map(
2343                |SortOptions {
2344                     descending,
2345                     nulls_first,
2346                 }| {
2347                    SortExprNode {
2348                        expr: None,
2349                        asc: !*descending,
2350                        nulls_first: *nulls_first,
2351                    }
2352                },
2353            )
2354            .collect();
2355
2356        Ok(protobuf::PhysicalPlanNode {
2357            physical_plan_type: Some(PhysicalPlanType::SortMergeJoin(Box::new(
2358                protobuf::SortMergeJoinExecNode {
2359                    left: Some(Box::new(left)),
2360                    right: Some(Box::new(right)),
2361                    on,
2362                    join_type: join_type.into(),
2363                    null_equality: null_equality.into(),
2364                    filter,
2365                    sort_options,
2366                },
2367            ))),
2368        })
2369    }
2370
2371    fn try_from_cross_join_exec(
2372        exec: &CrossJoinExec,
2373        extension_codec: &dyn PhysicalExtensionCodec,
2374    ) -> Result<Self> {
2375        let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2376            exec.left().to_owned(),
2377            extension_codec,
2378        )?;
2379        let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2380            exec.right().to_owned(),
2381            extension_codec,
2382        )?;
2383        Ok(protobuf::PhysicalPlanNode {
2384            physical_plan_type: Some(PhysicalPlanType::CrossJoin(Box::new(
2385                protobuf::CrossJoinExecNode {
2386                    left: Some(Box::new(left)),
2387                    right: Some(Box::new(right)),
2388                },
2389            ))),
2390        })
2391    }
2392
2393    fn try_from_aggregate_exec(
2394        exec: &AggregateExec,
2395        extension_codec: &dyn PhysicalExtensionCodec,
2396    ) -> Result<Self> {
2397        let groups: Vec<bool> = exec
2398            .group_expr()
2399            .groups()
2400            .iter()
2401            .flatten()
2402            .copied()
2403            .collect();
2404
2405        let group_names = exec
2406            .group_expr()
2407            .expr()
2408            .iter()
2409            .map(|expr| expr.1.to_owned())
2410            .collect();
2411
2412        let filter = exec
2413            .filter_expr()
2414            .iter()
2415            .map(|expr| serialize_maybe_filter(expr.to_owned(), extension_codec))
2416            .collect::<Result<Vec<_>>>()?;
2417
2418        let agg = exec
2419            .aggr_expr()
2420            .iter()
2421            .map(|expr| serialize_physical_aggr_expr(expr.to_owned(), extension_codec))
2422            .collect::<Result<Vec<_>>>()?;
2423
2424        let agg_names = exec
2425            .aggr_expr()
2426            .iter()
2427            .map(|expr| expr.name().to_string())
2428            .collect::<Vec<_>>();
2429
2430        let agg_mode = match exec.mode() {
2431            AggregateMode::Partial => protobuf::AggregateMode::Partial,
2432            AggregateMode::Final => protobuf::AggregateMode::Final,
2433            AggregateMode::FinalPartitioned => protobuf::AggregateMode::FinalPartitioned,
2434            AggregateMode::Single => protobuf::AggregateMode::Single,
2435            AggregateMode::SinglePartitioned => {
2436                protobuf::AggregateMode::SinglePartitioned
2437            }
2438        };
2439        let input_schema = exec.input_schema();
2440        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2441            exec.input().to_owned(),
2442            extension_codec,
2443        )?;
2444
2445        let null_expr = exec
2446            .group_expr()
2447            .null_expr()
2448            .iter()
2449            .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
2450            .collect::<Result<Vec<_>>>()?;
2451
2452        let group_expr = exec
2453            .group_expr()
2454            .expr()
2455            .iter()
2456            .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
2457            .collect::<Result<Vec<_>>>()?;
2458
2459        let limit = exec.limit().map(|value| protobuf::AggLimit {
2460            limit: value as u64,
2461        });
2462
2463        Ok(protobuf::PhysicalPlanNode {
2464            physical_plan_type: Some(PhysicalPlanType::Aggregate(Box::new(
2465                protobuf::AggregateExecNode {
2466                    group_expr,
2467                    group_expr_name: group_names,
2468                    aggr_expr: agg,
2469                    filter_expr: filter,
2470                    aggr_expr_name: agg_names,
2471                    mode: agg_mode as i32,
2472                    input: Some(Box::new(input)),
2473                    input_schema: Some(input_schema.as_ref().try_into()?),
2474                    null_expr,
2475                    groups,
2476                    limit,
2477                },
2478            ))),
2479        })
2480    }
2481
2482    fn try_from_empty_exec(
2483        empty: &EmptyExec,
2484        _extension_codec: &dyn PhysicalExtensionCodec,
2485    ) -> Result<Self> {
2486        let schema = empty.schema().as_ref().try_into()?;
2487        Ok(protobuf::PhysicalPlanNode {
2488            physical_plan_type: Some(PhysicalPlanType::Empty(protobuf::EmptyExecNode {
2489                schema: Some(schema),
2490            })),
2491        })
2492    }
2493
2494    fn try_from_placeholder_row_exec(
2495        empty: &PlaceholderRowExec,
2496        _extension_codec: &dyn PhysicalExtensionCodec,
2497    ) -> Result<Self> {
2498        let schema = empty.schema().as_ref().try_into()?;
2499        Ok(protobuf::PhysicalPlanNode {
2500            physical_plan_type: Some(PhysicalPlanType::PlaceholderRow(
2501                protobuf::PlaceholderRowExecNode {
2502                    schema: Some(schema),
2503                },
2504            )),
2505        })
2506    }
2507
2508    fn try_from_coalesce_batches_exec(
2509        coalesce_batches: &CoalesceBatchesExec,
2510        extension_codec: &dyn PhysicalExtensionCodec,
2511    ) -> Result<Self> {
2512        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2513            coalesce_batches.input().to_owned(),
2514            extension_codec,
2515        )?;
2516        Ok(protobuf::PhysicalPlanNode {
2517            physical_plan_type: Some(PhysicalPlanType::CoalesceBatches(Box::new(
2518                protobuf::CoalesceBatchesExecNode {
2519                    input: Some(Box::new(input)),
2520                    target_batch_size: coalesce_batches.target_batch_size() as u32,
2521                    fetch: coalesce_batches.fetch().map(|n| n as u32),
2522                },
2523            ))),
2524        })
2525    }
2526
2527    fn try_from_data_source_exec(
2528        data_source_exec: &DataSourceExec,
2529        extension_codec: &dyn PhysicalExtensionCodec,
2530    ) -> Result<Option<Self>> {
2531        let data_source = data_source_exec.data_source();
2532        if let Some(maybe_csv) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2533            let source = maybe_csv.file_source();
2534            if let Some(csv_config) = source.as_any().downcast_ref::<CsvSource>() {
2535                return Ok(Some(protobuf::PhysicalPlanNode {
2536                    physical_plan_type: Some(PhysicalPlanType::CsvScan(
2537                        protobuf::CsvScanExecNode {
2538                            base_conf: Some(serialize_file_scan_config(
2539                                maybe_csv,
2540                                extension_codec,
2541                            )?),
2542                            has_header: csv_config.has_header(),
2543                            delimiter: byte_to_string(
2544                                csv_config.delimiter(),
2545                                "delimiter",
2546                            )?,
2547                            quote: byte_to_string(csv_config.quote(), "quote")?,
2548                            optional_escape: if let Some(escape) = csv_config.escape() {
2549                                Some(
2550                                    protobuf::csv_scan_exec_node::OptionalEscape::Escape(
2551                                        byte_to_string(escape, "escape")?,
2552                                    ),
2553                                )
2554                            } else {
2555                                None
2556                            },
2557                            optional_comment: if let Some(comment) = csv_config.comment()
2558                            {
2559                                Some(protobuf::csv_scan_exec_node::OptionalComment::Comment(
2560                                        byte_to_string(comment, "comment")?,
2561                                    ))
2562                            } else {
2563                                None
2564                            },
2565                            newlines_in_values: maybe_csv.newlines_in_values(),
2566                            truncate_rows: csv_config.truncate_rows(),
2567                        },
2568                    )),
2569                }));
2570            }
2571        }
2572
2573        if let Some(scan_conf) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2574            let source = scan_conf.file_source();
2575            if let Some(_json_source) = source.as_any().downcast_ref::<JsonSource>() {
2576                return Ok(Some(protobuf::PhysicalPlanNode {
2577                    physical_plan_type: Some(PhysicalPlanType::JsonScan(
2578                        protobuf::JsonScanExecNode {
2579                            base_conf: Some(serialize_file_scan_config(
2580                                scan_conf,
2581                                extension_codec,
2582                            )?),
2583                        },
2584                    )),
2585                }));
2586            }
2587        }
2588
2589        #[cfg(feature = "parquet")]
2590        if let Some((maybe_parquet, conf)) =
2591            data_source_exec.downcast_to_file_source::<ParquetSource>()
2592        {
2593            let predicate = conf
2594                .filter()
2595                .map(|pred| serialize_physical_expr(&pred, extension_codec))
2596                .transpose()?;
2597            return Ok(Some(protobuf::PhysicalPlanNode {
2598                physical_plan_type: Some(PhysicalPlanType::ParquetScan(
2599                    protobuf::ParquetScanExecNode {
2600                        base_conf: Some(serialize_file_scan_config(
2601                            maybe_parquet,
2602                            extension_codec,
2603                        )?),
2604                        predicate,
2605                        parquet_options: Some(conf.table_parquet_options().try_into()?),
2606                    },
2607                )),
2608            }));
2609        }
2610
2611        #[cfg(feature = "avro")]
2612        if let Some(maybe_avro) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2613            let source = maybe_avro.file_source();
2614            if source.as_any().downcast_ref::<AvroSource>().is_some() {
2615                return Ok(Some(protobuf::PhysicalPlanNode {
2616                    physical_plan_type: Some(PhysicalPlanType::AvroScan(
2617                        protobuf::AvroScanExecNode {
2618                            base_conf: Some(serialize_file_scan_config(
2619                                maybe_avro,
2620                                extension_codec,
2621                            )?),
2622                        },
2623                    )),
2624                }));
2625            }
2626        }
2627
2628        if let Some(source_conf) =
2629            data_source.as_any().downcast_ref::<MemorySourceConfig>()
2630        {
2631            let proto_partitions = source_conf
2632                .partitions()
2633                .iter()
2634                .map(|p| serialize_record_batches(p))
2635                .collect::<Result<Vec<_>>>()?;
2636
2637            let proto_schema: protobuf::Schema =
2638                source_conf.original_schema().as_ref().try_into()?;
2639
2640            let proto_projection = source_conf
2641                .projection()
2642                .as_ref()
2643                .map_or_else(Vec::new, |v| {
2644                    v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2645                });
2646
2647            let proto_sort_information = source_conf
2648                .sort_information()
2649                .iter()
2650                .map(|ordering| {
2651                    let sort_exprs = serialize_physical_sort_exprs(
2652                        ordering.to_owned(),
2653                        extension_codec,
2654                    )?;
2655                    Ok::<_, DataFusionError>(protobuf::PhysicalSortExprNodeCollection {
2656                        physical_sort_expr_nodes: sort_exprs,
2657                    })
2658                })
2659                .collect::<Result<Vec<_>, _>>()?;
2660
2661            return Ok(Some(protobuf::PhysicalPlanNode {
2662                physical_plan_type: Some(PhysicalPlanType::MemoryScan(
2663                    protobuf::MemoryScanExecNode {
2664                        partitions: proto_partitions,
2665                        schema: Some(proto_schema),
2666                        projection: proto_projection,
2667                        sort_information: proto_sort_information,
2668                        show_sizes: source_conf.show_sizes(),
2669                        fetch: source_conf.fetch().map(|f| f as u32),
2670                    },
2671                )),
2672            }));
2673        }
2674
2675        Ok(None)
2676    }
2677
2678    fn try_from_coalesce_partitions_exec(
2679        exec: &CoalescePartitionsExec,
2680        extension_codec: &dyn PhysicalExtensionCodec,
2681    ) -> Result<Self> {
2682        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2683            exec.input().to_owned(),
2684            extension_codec,
2685        )?;
2686        Ok(protobuf::PhysicalPlanNode {
2687            physical_plan_type: Some(PhysicalPlanType::Merge(Box::new(
2688                protobuf::CoalescePartitionsExecNode {
2689                    input: Some(Box::new(input)),
2690                    fetch: exec.fetch().map(|f| f as u32),
2691                },
2692            ))),
2693        })
2694    }
2695
2696    fn try_from_repartition_exec(
2697        exec: &RepartitionExec,
2698        extension_codec: &dyn PhysicalExtensionCodec,
2699    ) -> Result<Self> {
2700        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2701            exec.input().to_owned(),
2702            extension_codec,
2703        )?;
2704
2705        let pb_partitioning =
2706            serialize_partitioning(exec.partitioning(), extension_codec)?;
2707
2708        Ok(protobuf::PhysicalPlanNode {
2709            physical_plan_type: Some(PhysicalPlanType::Repartition(Box::new(
2710                protobuf::RepartitionExecNode {
2711                    input: Some(Box::new(input)),
2712                    partitioning: Some(pb_partitioning),
2713                },
2714            ))),
2715        })
2716    }
2717
2718    fn try_from_sort_exec(
2719        exec: &SortExec,
2720        extension_codec: &dyn PhysicalExtensionCodec,
2721    ) -> Result<Self> {
2722        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2723            exec.input().to_owned(),
2724            extension_codec,
2725        )?;
2726        let expr = exec
2727            .expr()
2728            .iter()
2729            .map(|expr| {
2730                let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
2731                    expr: Some(Box::new(serialize_physical_expr(
2732                        &expr.expr,
2733                        extension_codec,
2734                    )?)),
2735                    asc: !expr.options.descending,
2736                    nulls_first: expr.options.nulls_first,
2737                });
2738                Ok(protobuf::PhysicalExprNode {
2739                    expr_type: Some(ExprType::Sort(sort_expr)),
2740                })
2741            })
2742            .collect::<Result<Vec<_>>>()?;
2743        Ok(protobuf::PhysicalPlanNode {
2744            physical_plan_type: Some(PhysicalPlanType::Sort(Box::new(
2745                protobuf::SortExecNode {
2746                    input: Some(Box::new(input)),
2747                    expr,
2748                    fetch: match exec.fetch() {
2749                        Some(n) => n as i64,
2750                        _ => -1,
2751                    },
2752                    preserve_partitioning: exec.preserve_partitioning(),
2753                },
2754            ))),
2755        })
2756    }
2757
2758    fn try_from_union_exec(
2759        union: &UnionExec,
2760        extension_codec: &dyn PhysicalExtensionCodec,
2761    ) -> Result<Self> {
2762        let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
2763        for input in union.inputs() {
2764            inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
2765                input.to_owned(),
2766                extension_codec,
2767            )?);
2768        }
2769        Ok(protobuf::PhysicalPlanNode {
2770            physical_plan_type: Some(PhysicalPlanType::Union(protobuf::UnionExecNode {
2771                inputs,
2772            })),
2773        })
2774    }
2775
2776    fn try_from_interleave_exec(
2777        interleave: &InterleaveExec,
2778        extension_codec: &dyn PhysicalExtensionCodec,
2779    ) -> Result<Self> {
2780        let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
2781        for input in interleave.inputs() {
2782            inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
2783                input.to_owned(),
2784                extension_codec,
2785            )?);
2786        }
2787        Ok(protobuf::PhysicalPlanNode {
2788            physical_plan_type: Some(PhysicalPlanType::Interleave(
2789                protobuf::InterleaveExecNode { inputs },
2790            )),
2791        })
2792    }
2793
2794    fn try_from_sort_preserving_merge_exec(
2795        exec: &SortPreservingMergeExec,
2796        extension_codec: &dyn PhysicalExtensionCodec,
2797    ) -> Result<Self> {
2798        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2799            exec.input().to_owned(),
2800            extension_codec,
2801        )?;
2802        let expr = exec
2803            .expr()
2804            .iter()
2805            .map(|expr| {
2806                let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
2807                    expr: Some(Box::new(serialize_physical_expr(
2808                        &expr.expr,
2809                        extension_codec,
2810                    )?)),
2811                    asc: !expr.options.descending,
2812                    nulls_first: expr.options.nulls_first,
2813                });
2814                Ok(protobuf::PhysicalExprNode {
2815                    expr_type: Some(ExprType::Sort(sort_expr)),
2816                })
2817            })
2818            .collect::<Result<Vec<_>>>()?;
2819        Ok(protobuf::PhysicalPlanNode {
2820            physical_plan_type: Some(PhysicalPlanType::SortPreservingMerge(Box::new(
2821                protobuf::SortPreservingMergeExecNode {
2822                    input: Some(Box::new(input)),
2823                    expr,
2824                    fetch: exec.fetch().map(|f| f as i64).unwrap_or(-1),
2825                },
2826            ))),
2827        })
2828    }
2829
2830    fn try_from_nested_loop_join_exec(
2831        exec: &NestedLoopJoinExec,
2832        extension_codec: &dyn PhysicalExtensionCodec,
2833    ) -> Result<Self> {
2834        let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2835            exec.left().to_owned(),
2836            extension_codec,
2837        )?;
2838        let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2839            exec.right().to_owned(),
2840            extension_codec,
2841        )?;
2842
2843        let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2844        let filter = exec
2845            .filter()
2846            .as_ref()
2847            .map(|f| {
2848                let expression =
2849                    serialize_physical_expr(f.expression(), extension_codec)?;
2850                let column_indices = f
2851                    .column_indices()
2852                    .iter()
2853                    .map(|i| {
2854                        let side: protobuf::JoinSide = i.side.to_owned().into();
2855                        protobuf::ColumnIndex {
2856                            index: i.index as u32,
2857                            side: side.into(),
2858                        }
2859                    })
2860                    .collect();
2861                let schema = f.schema().as_ref().try_into()?;
2862                Ok(protobuf::JoinFilter {
2863                    expression: Some(expression),
2864                    column_indices,
2865                    schema: Some(schema),
2866                })
2867            })
2868            .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2869
2870        Ok(protobuf::PhysicalPlanNode {
2871            physical_plan_type: Some(PhysicalPlanType::NestedLoopJoin(Box::new(
2872                protobuf::NestedLoopJoinExecNode {
2873                    left: Some(Box::new(left)),
2874                    right: Some(Box::new(right)),
2875                    join_type: join_type.into(),
2876                    filter,
2877                    projection: exec.projection().map_or_else(Vec::new, |v| {
2878                        v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2879                    }),
2880                },
2881            ))),
2882        })
2883    }
2884
2885    fn try_from_window_agg_exec(
2886        exec: &WindowAggExec,
2887        extension_codec: &dyn PhysicalExtensionCodec,
2888    ) -> Result<Self> {
2889        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2890            exec.input().to_owned(),
2891            extension_codec,
2892        )?;
2893
2894        let window_expr = exec
2895            .window_expr()
2896            .iter()
2897            .map(|e| serialize_physical_window_expr(e, extension_codec))
2898            .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
2899
2900        let partition_keys = exec
2901            .partition_keys()
2902            .iter()
2903            .map(|e| serialize_physical_expr(e, extension_codec))
2904            .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
2905
2906        Ok(protobuf::PhysicalPlanNode {
2907            physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
2908                protobuf::WindowAggExecNode {
2909                    input: Some(Box::new(input)),
2910                    window_expr,
2911                    partition_keys,
2912                    input_order_mode: None,
2913                },
2914            ))),
2915        })
2916    }
2917
2918    fn try_from_bounded_window_agg_exec(
2919        exec: &BoundedWindowAggExec,
2920        extension_codec: &dyn PhysicalExtensionCodec,
2921    ) -> Result<Self> {
2922        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2923            exec.input().to_owned(),
2924            extension_codec,
2925        )?;
2926
2927        let window_expr = exec
2928            .window_expr()
2929            .iter()
2930            .map(|e| serialize_physical_window_expr(e, extension_codec))
2931            .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
2932
2933        let partition_keys = exec
2934            .partition_keys()
2935            .iter()
2936            .map(|e| serialize_physical_expr(e, extension_codec))
2937            .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
2938
2939        let input_order_mode = match &exec.input_order_mode {
2940            InputOrderMode::Linear => {
2941                window_agg_exec_node::InputOrderMode::Linear(protobuf::EmptyMessage {})
2942            }
2943            InputOrderMode::PartiallySorted(columns) => {
2944                window_agg_exec_node::InputOrderMode::PartiallySorted(
2945                    protobuf::PartiallySortedInputOrderMode {
2946                        columns: columns.iter().map(|c| *c as u64).collect(),
2947                    },
2948                )
2949            }
2950            InputOrderMode::Sorted => {
2951                window_agg_exec_node::InputOrderMode::Sorted(protobuf::EmptyMessage {})
2952            }
2953        };
2954
2955        Ok(protobuf::PhysicalPlanNode {
2956            physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
2957                protobuf::WindowAggExecNode {
2958                    input: Some(Box::new(input)),
2959                    window_expr,
2960                    partition_keys,
2961                    input_order_mode: Some(input_order_mode),
2962                },
2963            ))),
2964        })
2965    }
2966
2967    fn try_from_data_sink_exec(
2968        exec: &DataSinkExec,
2969        extension_codec: &dyn PhysicalExtensionCodec,
2970    ) -> Result<Option<Self>> {
2971        let input: protobuf::PhysicalPlanNode =
2972            protobuf::PhysicalPlanNode::try_from_physical_plan(
2973                exec.input().to_owned(),
2974                extension_codec,
2975            )?;
2976        let sort_order = match exec.sort_order() {
2977            Some(requirements) => {
2978                let expr = requirements
2979                    .iter()
2980                    .map(|requirement| {
2981                        let expr: PhysicalSortExpr = requirement.to_owned().into();
2982                        let sort_expr = protobuf::PhysicalSortExprNode {
2983                            expr: Some(Box::new(serialize_physical_expr(
2984                                &expr.expr,
2985                                extension_codec,
2986                            )?)),
2987                            asc: !expr.options.descending,
2988                            nulls_first: expr.options.nulls_first,
2989                        };
2990                        Ok(sort_expr)
2991                    })
2992                    .collect::<Result<Vec<_>>>()?;
2993                Some(protobuf::PhysicalSortExprNodeCollection {
2994                    physical_sort_expr_nodes: expr,
2995                })
2996            }
2997            None => None,
2998        };
2999
3000        if let Some(sink) = exec.sink().as_any().downcast_ref::<JsonSink>() {
3001            return Ok(Some(protobuf::PhysicalPlanNode {
3002                physical_plan_type: Some(PhysicalPlanType::JsonSink(Box::new(
3003                    protobuf::JsonSinkExecNode {
3004                        input: Some(Box::new(input)),
3005                        sink: Some(sink.try_into()?),
3006                        sink_schema: Some(exec.schema().as_ref().try_into()?),
3007                        sort_order,
3008                    },
3009                ))),
3010            }));
3011        }
3012
3013        if let Some(sink) = exec.sink().as_any().downcast_ref::<CsvSink>() {
3014            return Ok(Some(protobuf::PhysicalPlanNode {
3015                physical_plan_type: Some(PhysicalPlanType::CsvSink(Box::new(
3016                    protobuf::CsvSinkExecNode {
3017                        input: Some(Box::new(input)),
3018                        sink: Some(sink.try_into()?),
3019                        sink_schema: Some(exec.schema().as_ref().try_into()?),
3020                        sort_order,
3021                    },
3022                ))),
3023            }));
3024        }
3025
3026        #[cfg(feature = "parquet")]
3027        if let Some(sink) = exec.sink().as_any().downcast_ref::<ParquetSink>() {
3028            return Ok(Some(protobuf::PhysicalPlanNode {
3029                physical_plan_type: Some(PhysicalPlanType::ParquetSink(Box::new(
3030                    protobuf::ParquetSinkExecNode {
3031                        input: Some(Box::new(input)),
3032                        sink: Some(sink.try_into()?),
3033                        sink_schema: Some(exec.schema().as_ref().try_into()?),
3034                        sort_order,
3035                    },
3036                ))),
3037            }));
3038        }
3039
3040        // If unknown DataSink then let extension handle it
3041        Ok(None)
3042    }
3043
3044    fn try_from_unnest_exec(
3045        exec: &UnnestExec,
3046        extension_codec: &dyn PhysicalExtensionCodec,
3047    ) -> Result<Self> {
3048        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
3049            exec.input().to_owned(),
3050            extension_codec,
3051        )?;
3052
3053        Ok(protobuf::PhysicalPlanNode {
3054            physical_plan_type: Some(PhysicalPlanType::Unnest(Box::new(
3055                protobuf::UnnestExecNode {
3056                    input: Some(Box::new(input)),
3057                    schema: Some(exec.schema().try_into()?),
3058                    list_type_columns: exec
3059                        .list_column_indices()
3060                        .iter()
3061                        .map(|c| ProtoListUnnest {
3062                            index_in_input_schema: c.index_in_input_schema as _,
3063                            depth: c.depth as _,
3064                        })
3065                        .collect(),
3066                    struct_type_columns: exec
3067                        .struct_column_indices()
3068                        .iter()
3069                        .map(|c| *c as _)
3070                        .collect(),
3071                    options: Some(exec.options().into()),
3072                },
3073            ))),
3074        })
3075    }
3076
3077    fn try_from_cooperative_exec(
3078        exec: &CooperativeExec,
3079        extension_codec: &dyn PhysicalExtensionCodec,
3080    ) -> Result<Self> {
3081        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
3082            exec.input().to_owned(),
3083            extension_codec,
3084        )?;
3085
3086        Ok(protobuf::PhysicalPlanNode {
3087            physical_plan_type: Some(PhysicalPlanType::Cooperative(Box::new(
3088                protobuf::CooperativeExecNode {
3089                    input: Some(Box::new(input)),
3090                },
3091            ))),
3092        })
3093    }
3094
3095    fn str_to_generate_series_name(name: &str) -> Result<protobuf::GenerateSeriesName> {
3096        match name {
3097            "generate_series" => Ok(protobuf::GenerateSeriesName::GsGenerateSeries),
3098            "range" => Ok(protobuf::GenerateSeriesName::GsRange),
3099            _ => internal_err!("unknown name: {name}"),
3100        }
3101    }
3102
3103    fn try_from_lazy_memory_exec(exec: &LazyMemoryExec) -> Result<Option<Self>> {
3104        let generators = exec.generators();
3105
3106        // ensure we only have one generator
3107        let [generator] = generators.as_slice() else {
3108            return Ok(None);
3109        };
3110
3111        let generator_guard = generator.read();
3112
3113        // Try to downcast to different generate_series types
3114        if let Some(empty_gen) = generator_guard.as_any().downcast_ref::<Empty>() {
3115            let schema = exec.schema();
3116            let node = protobuf::GenerateSeriesNode {
3117                schema: Some(schema.as_ref().try_into()?),
3118                target_batch_size: 8192, // Default batch size
3119                args: Some(protobuf::generate_series_node::Args::ContainsNull(
3120                    protobuf::GenerateSeriesArgsContainsNull {
3121                        name: Self::str_to_generate_series_name(empty_gen.name())? as i32,
3122                    },
3123                )),
3124            };
3125
3126            return Ok(Some(protobuf::PhysicalPlanNode {
3127                physical_plan_type: Some(PhysicalPlanType::GenerateSeries(node)),
3128            }));
3129        }
3130
3131        if let Some(int_64) = generator_guard
3132            .as_any()
3133            .downcast_ref::<GenericSeriesState<i64>>()
3134        {
3135            let schema = exec.schema();
3136            let node = protobuf::GenerateSeriesNode {
3137                schema: Some(schema.as_ref().try_into()?),
3138                target_batch_size: int_64.batch_size() as u32,
3139                args: Some(protobuf::generate_series_node::Args::Int64Args(
3140                    protobuf::GenerateSeriesArgsInt64 {
3141                        start: *int_64.start(),
3142                        end: *int_64.end(),
3143                        step: *int_64.step(),
3144                        include_end: int_64.include_end(),
3145                        name: Self::str_to_generate_series_name(int_64.name())? as i32,
3146                    },
3147                )),
3148            };
3149
3150            return Ok(Some(protobuf::PhysicalPlanNode {
3151                physical_plan_type: Some(PhysicalPlanType::GenerateSeries(node)),
3152            }));
3153        }
3154
3155        if let Some(timestamp_args) = generator_guard
3156            .as_any()
3157            .downcast_ref::<GenericSeriesState<TimestampValue>>()
3158        {
3159            let schema = exec.schema();
3160
3161            let start = timestamp_args.start().value();
3162            let end = timestamp_args.end().value();
3163
3164            let step_value = timestamp_args.step();
3165
3166            let step = Some(datafusion_proto_common::IntervalMonthDayNanoValue {
3167                months: step_value.months,
3168                days: step_value.days,
3169                nanos: step_value.nanoseconds,
3170            });
3171            let include_end = timestamp_args.include_end();
3172            let name = Self::str_to_generate_series_name(timestamp_args.name())? as i32;
3173
3174            let args = match timestamp_args.current().tz_str() {
3175                Some(tz) => protobuf::generate_series_node::Args::TimestampArgs(
3176                    protobuf::GenerateSeriesArgsTimestamp {
3177                        start,
3178                        end,
3179                        step,
3180                        include_end,
3181                        name,
3182                        tz: Some(tz.to_string()),
3183                    },
3184                ),
3185                None => protobuf::generate_series_node::Args::DateArgs(
3186                    protobuf::GenerateSeriesArgsDate {
3187                        start,
3188                        end,
3189                        step,
3190                        include_end,
3191                        name,
3192                    },
3193                ),
3194            };
3195
3196            let node = protobuf::GenerateSeriesNode {
3197                schema: Some(schema.as_ref().try_into()?),
3198                target_batch_size: timestamp_args.batch_size() as u32,
3199                args: Some(args),
3200            };
3201
3202            return Ok(Some(protobuf::PhysicalPlanNode {
3203                physical_plan_type: Some(PhysicalPlanType::GenerateSeries(node)),
3204            }));
3205        }
3206
3207        Ok(None)
3208    }
3209}
3210
3211pub trait AsExecutionPlan: Debug + Send + Sync + Clone {
3212    fn try_decode(buf: &[u8]) -> Result<Self>
3213    where
3214        Self: Sized;
3215
3216    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
3217    where
3218        B: BufMut,
3219        Self: Sized;
3220
3221    fn try_into_physical_plan(
3222        &self,
3223        ctx: &TaskContext,
3224
3225        extension_codec: &dyn PhysicalExtensionCodec,
3226    ) -> Result<Arc<dyn ExecutionPlan>>;
3227
3228    fn try_from_physical_plan(
3229        plan: Arc<dyn ExecutionPlan>,
3230        extension_codec: &dyn PhysicalExtensionCodec,
3231    ) -> Result<Self>
3232    where
3233        Self: Sized;
3234}
3235
3236pub trait PhysicalExtensionCodec: Debug + Send + Sync {
3237    fn try_decode(
3238        &self,
3239        buf: &[u8],
3240        inputs: &[Arc<dyn ExecutionPlan>],
3241        ctx: &TaskContext,
3242    ) -> Result<Arc<dyn ExecutionPlan>>;
3243
3244    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()>;
3245
3246    fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
3247        not_impl_err!("PhysicalExtensionCodec is not provided for scalar function {name}")
3248    }
3249
3250    fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
3251        Ok(())
3252    }
3253
3254    fn try_decode_expr(
3255        &self,
3256        _buf: &[u8],
3257        _inputs: &[Arc<dyn PhysicalExpr>],
3258    ) -> Result<Arc<dyn PhysicalExpr>> {
3259        not_impl_err!("PhysicalExtensionCodec is not provided")
3260    }
3261
3262    fn try_encode_expr(
3263        &self,
3264        _node: &Arc<dyn PhysicalExpr>,
3265        _buf: &mut Vec<u8>,
3266    ) -> Result<()> {
3267        not_impl_err!("PhysicalExtensionCodec is not provided")
3268    }
3269
3270    fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
3271        not_impl_err!(
3272            "PhysicalExtensionCodec is not provided for aggregate function {name}"
3273        )
3274    }
3275
3276    fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
3277        Ok(())
3278    }
3279
3280    fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
3281        not_impl_err!("PhysicalExtensionCodec is not provided for window function {name}")
3282    }
3283
3284    fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
3285        Ok(())
3286    }
3287}
3288
3289#[derive(Debug)]
3290pub struct DefaultPhysicalExtensionCodec {}
3291
3292impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec {
3293    fn try_decode(
3294        &self,
3295        _buf: &[u8],
3296        _inputs: &[Arc<dyn ExecutionPlan>],
3297        _ctx: &TaskContext,
3298    ) -> Result<Arc<dyn ExecutionPlan>> {
3299        not_impl_err!("PhysicalExtensionCodec is not provided")
3300    }
3301
3302    fn try_encode(
3303        &self,
3304        _node: Arc<dyn ExecutionPlan>,
3305        _buf: &mut Vec<u8>,
3306    ) -> Result<()> {
3307        not_impl_err!("PhysicalExtensionCodec is not provided")
3308    }
3309}
3310
3311/// DataEncoderTuple captures the position of the encoder
3312/// in the codec list that was used to encode the data and actual encoded data
3313#[derive(Clone, PartialEq, prost::Message)]
3314struct DataEncoderTuple {
3315    /// The position of encoder used to encode data
3316    /// (to be used for decoding)
3317    #[prost(uint32, tag = 1)]
3318    pub encoder_position: u32,
3319
3320    #[prost(bytes, tag = 2)]
3321    pub blob: Vec<u8>,
3322}
3323
3324/// A PhysicalExtensionCodec that tries one of multiple inner codecs
3325/// until one works
3326#[derive(Debug)]
3327pub struct ComposedPhysicalExtensionCodec {
3328    codecs: Vec<Arc<dyn PhysicalExtensionCodec>>,
3329}
3330
3331impl ComposedPhysicalExtensionCodec {
3332    // Position in this codecs list is important as it will be used for decoding.
3333    // If new codec is added it should go to last position.
3334    pub fn new(codecs: Vec<Arc<dyn PhysicalExtensionCodec>>) -> Self {
3335        Self { codecs }
3336    }
3337
3338    fn decode_protobuf<R>(
3339        &self,
3340        buf: &[u8],
3341        decode: impl FnOnce(&dyn PhysicalExtensionCodec, &[u8]) -> Result<R>,
3342    ) -> Result<R> {
3343        let proto =
3344            DataEncoderTuple::decode(buf).map_err(|e| internal_datafusion_err!("{e}"))?;
3345
3346        let codec = self.codecs.get(proto.encoder_position as usize).ok_or(
3347            internal_datafusion_err!("Can't find required codec in codec list"),
3348        )?;
3349
3350        decode(codec.as_ref(), &proto.blob)
3351    }
3352
3353    fn encode_protobuf(
3354        &self,
3355        buf: &mut Vec<u8>,
3356        mut encode: impl FnMut(&dyn PhysicalExtensionCodec, &mut Vec<u8>) -> Result<()>,
3357    ) -> Result<()> {
3358        let mut data = vec![];
3359        let mut last_err = None;
3360        let mut encoder_position = None;
3361
3362        // find the encoder
3363        for (position, codec) in self.codecs.iter().enumerate() {
3364            match encode(codec.as_ref(), &mut data) {
3365                Ok(_) => {
3366                    encoder_position = Some(position as u32);
3367                    break;
3368                }
3369                Err(err) => last_err = Some(err),
3370            }
3371        }
3372
3373        let encoder_position = encoder_position.ok_or_else(|| {
3374            last_err.unwrap_or_else(|| {
3375                DataFusionError::NotImplemented(
3376                    "Empty list of composed codecs".to_owned(),
3377                )
3378            })
3379        })?;
3380
3381        // encode with encoder position
3382        let proto = DataEncoderTuple {
3383            encoder_position,
3384            blob: data,
3385        };
3386        proto
3387            .encode(buf)
3388            .map_err(|e| internal_datafusion_err!("{e}"))
3389    }
3390}
3391
3392impl PhysicalExtensionCodec for ComposedPhysicalExtensionCodec {
3393    fn try_decode(
3394        &self,
3395        buf: &[u8],
3396        inputs: &[Arc<dyn ExecutionPlan>],
3397        ctx: &TaskContext,
3398    ) -> Result<Arc<dyn ExecutionPlan>> {
3399        self.decode_protobuf(buf, |codec, data| codec.try_decode(data, inputs, ctx))
3400    }
3401
3402    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
3403        self.encode_protobuf(buf, |codec, data| codec.try_encode(Arc::clone(&node), data))
3404    }
3405
3406    fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
3407        self.decode_protobuf(buf, |codec, data| codec.try_decode_udf(name, data))
3408    }
3409
3410    fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
3411        self.encode_protobuf(buf, |codec, data| codec.try_encode_udf(node, data))
3412    }
3413
3414    fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
3415        self.decode_protobuf(buf, |codec, data| codec.try_decode_udaf(name, data))
3416    }
3417
3418    fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
3419        self.encode_protobuf(buf, |codec, data| codec.try_encode_udaf(node, data))
3420    }
3421}
3422
3423fn into_physical_plan(
3424    node: &Option<Box<protobuf::PhysicalPlanNode>>,
3425    ctx: &TaskContext,
3426
3427    extension_codec: &dyn PhysicalExtensionCodec,
3428) -> Result<Arc<dyn ExecutionPlan>> {
3429    if let Some(field) = node {
3430        field.try_into_physical_plan(ctx, extension_codec)
3431    } else {
3432        Err(proto_error("Missing required field in protobuf"))
3433    }
3434}