Skip to main content

datafusion_proto/physical_plan/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::cell::RefCell;
19use std::collections::HashMap;
20use std::fmt::Debug;
21use std::hash::{DefaultHasher, Hash, Hasher};
22use std::sync::Arc;
23
24use arrow::compute::SortOptions;
25use arrow::datatypes::{IntervalMonthDayNanoType, Schema, SchemaRef};
26use datafusion_catalog::memory::MemorySourceConfig;
27use datafusion_common::config::CsvOptions;
28use datafusion_common::{
29    DataFusionError, Result, internal_datafusion_err, internal_err, not_impl_err,
30};
31#[cfg(feature = "parquet")]
32use datafusion_datasource::file::FileSource;
33use datafusion_datasource::file_compression_type::FileCompressionType;
34use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
35use datafusion_datasource::sink::DataSinkExec;
36use datafusion_datasource::source::{DataSource, DataSourceExec};
37use datafusion_datasource_arrow::source::ArrowSource;
38#[cfg(feature = "avro")]
39use datafusion_datasource_avro::source::AvroSource;
40use datafusion_datasource_csv::file_format::CsvSink;
41use datafusion_datasource_csv::source::CsvSource;
42use datafusion_datasource_json::file_format::JsonSink;
43use datafusion_datasource_json::source::JsonSource;
44#[cfg(feature = "parquet")]
45use datafusion_datasource_parquet::CachedParquetFileReaderFactory;
46#[cfg(feature = "parquet")]
47use datafusion_datasource_parquet::file_format::ParquetSink;
48#[cfg(feature = "parquet")]
49use datafusion_datasource_parquet::source::ParquetSource;
50#[cfg(feature = "parquet")]
51use datafusion_execution::object_store::ObjectStoreUrl;
52use datafusion_execution::{FunctionRegistry, TaskContext};
53use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
54use datafusion_functions_table::generate_series::{
55    Empty, GenSeriesArgs, GenerateSeriesTable, GenericSeriesState, TimestampValue,
56};
57use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
58use datafusion_physical_expr::async_scalar_function::AsyncFuncExpr;
59use datafusion_physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef};
60use datafusion_physical_plan::aggregates::{
61    AggregateExec, AggregateMode, LimitOptions, PhysicalGroupBy,
62};
63use datafusion_physical_plan::analyze::AnalyzeExec;
64use datafusion_physical_plan::async_func::AsyncFuncExec;
65use datafusion_physical_plan::buffer::BufferExec;
66#[expect(deprecated)]
67use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
68use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
69use datafusion_physical_plan::coop::CooperativeExec;
70use datafusion_physical_plan::empty::EmptyExec;
71use datafusion_physical_plan::explain::ExplainExec;
72use datafusion_physical_plan::expressions::PhysicalSortExpr;
73use datafusion_physical_plan::filter::{FilterExec, FilterExecBuilder};
74use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter};
75use datafusion_physical_plan::joins::{
76    CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode, SortMergeJoinExec,
77    StreamJoinPartitionMode, SymmetricHashJoinExec,
78};
79use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
80use datafusion_physical_plan::memory::LazyMemoryExec;
81use datafusion_physical_plan::metrics::MetricType;
82use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
83use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
84use datafusion_physical_plan::repartition::RepartitionExec;
85use datafusion_physical_plan::sorts::sort::SortExec;
86use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
87use datafusion_physical_plan::union::{InterleaveExec, UnionExec};
88use datafusion_physical_plan::unnest::{ListUnnest, UnnestExec};
89use datafusion_physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
90use datafusion_physical_plan::{ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr};
91use prost::Message;
92use prost::bytes::BufMut;
93
94use self::from_proto::parse_protobuf_partitioning;
95use self::to_proto::serialize_partitioning;
96use crate::common::{byte_to_string, str_to_byte};
97use crate::physical_plan::from_proto::{
98    parse_physical_expr_with_converter, parse_physical_sort_expr,
99    parse_physical_sort_exprs, parse_physical_window_expr,
100    parse_protobuf_file_scan_config, parse_record_batches, parse_table_schema_from_proto,
101};
102use crate::physical_plan::to_proto::{
103    serialize_file_scan_config, serialize_maybe_filter, serialize_physical_aggr_expr,
104    serialize_physical_expr_with_converter, serialize_physical_sort_exprs,
105    serialize_physical_window_expr, serialize_record_batches,
106};
107use crate::protobuf::physical_aggregate_expr_node::AggregateFunction;
108use crate::protobuf::physical_expr_node::ExprType;
109use crate::protobuf::physical_plan_node::PhysicalPlanType;
110use crate::protobuf::{
111    self, ListUnnest as ProtoListUnnest, SortExprNode, SortMergeJoinExecNode,
112    proto_error, window_agg_exec_node,
113};
114use crate::{convert_required, into_required};
115
116pub mod from_proto;
117pub mod to_proto;
118
119impl AsExecutionPlan for protobuf::PhysicalPlanNode {
120    fn try_decode(buf: &[u8]) -> Result<Self>
121    where
122        Self: Sized,
123    {
124        protobuf::PhysicalPlanNode::decode(buf).map_err(|e| {
125            internal_datafusion_err!("failed to decode physical plan: {e:?}")
126        })
127    }
128
129    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
130    where
131        B: BufMut,
132        Self: Sized,
133    {
134        self.encode(buf).map_err(|e| {
135            internal_datafusion_err!("failed to encode physical plan: {e:?}")
136        })
137    }
138
139    fn try_into_physical_plan(
140        &self,
141        ctx: &TaskContext,
142        codec: &dyn PhysicalExtensionCodec,
143    ) -> Result<Arc<dyn ExecutionPlan>> {
144        self.try_into_physical_plan_with_converter(
145            ctx,
146            codec,
147            &DefaultPhysicalProtoConverter {},
148        )
149    }
150
151    fn try_from_physical_plan(
152        plan: Arc<dyn ExecutionPlan>,
153        codec: &dyn PhysicalExtensionCodec,
154    ) -> Result<Self>
155    where
156        Self: Sized,
157    {
158        Self::try_from_physical_plan_with_converter(
159            plan,
160            codec,
161            &DefaultPhysicalProtoConverter {},
162        )
163    }
164}
165
166impl protobuf::PhysicalPlanNode {
167    pub fn try_into_physical_plan_with_converter(
168        &self,
169        ctx: &TaskContext,
170
171        codec: &dyn PhysicalExtensionCodec,
172        proto_converter: &dyn PhysicalProtoConverterExtension,
173    ) -> Result<Arc<dyn ExecutionPlan>> {
174        let plan = self.physical_plan_type.as_ref().ok_or_else(|| {
175            proto_error(format!(
176                "physical_plan::from_proto() Unsupported physical plan '{self:?}'"
177            ))
178        })?;
179        match plan {
180            PhysicalPlanType::Explain(explain) => {
181                self.try_into_explain_physical_plan(explain, ctx, codec, proto_converter)
182            }
183            PhysicalPlanType::Projection(projection) => self
184                .try_into_projection_physical_plan(
185                    projection,
186                    ctx,
187                    codec,
188                    proto_converter,
189                ),
190            PhysicalPlanType::Filter(filter) => {
191                self.try_into_filter_physical_plan(filter, ctx, codec, proto_converter)
192            }
193            PhysicalPlanType::CsvScan(scan) => {
194                self.try_into_csv_scan_physical_plan(scan, ctx, codec, proto_converter)
195            }
196            PhysicalPlanType::JsonScan(scan) => {
197                self.try_into_json_scan_physical_plan(scan, ctx, codec, proto_converter)
198            }
199            PhysicalPlanType::ParquetScan(scan) => self
200                .try_into_parquet_scan_physical_plan(scan, ctx, codec, proto_converter),
201            PhysicalPlanType::AvroScan(scan) => {
202                self.try_into_avro_scan_physical_plan(scan, ctx, codec, proto_converter)
203            }
204            PhysicalPlanType::MemoryScan(scan) => {
205                self.try_into_memory_scan_physical_plan(scan, ctx, codec, proto_converter)
206            }
207            PhysicalPlanType::ArrowScan(scan) => {
208                self.try_into_arrow_scan_physical_plan(scan, ctx, codec, proto_converter)
209            }
210            PhysicalPlanType::CoalesceBatches(coalesce_batches) => self
211                .try_into_coalesce_batches_physical_plan(
212                    coalesce_batches,
213                    ctx,
214                    codec,
215                    proto_converter,
216                ),
217            PhysicalPlanType::Merge(merge) => {
218                self.try_into_merge_physical_plan(merge, ctx, codec, proto_converter)
219            }
220            PhysicalPlanType::Repartition(repart) => self
221                .try_into_repartition_physical_plan(repart, ctx, codec, proto_converter),
222            PhysicalPlanType::GlobalLimit(limit) => self
223                .try_into_global_limit_physical_plan(limit, ctx, codec, proto_converter),
224            PhysicalPlanType::LocalLimit(limit) => self
225                .try_into_local_limit_physical_plan(limit, ctx, codec, proto_converter),
226            PhysicalPlanType::Window(window_agg) => self.try_into_window_physical_plan(
227                window_agg,
228                ctx,
229                codec,
230                proto_converter,
231            ),
232            PhysicalPlanType::Aggregate(hash_agg) => self
233                .try_into_aggregate_physical_plan(hash_agg, ctx, codec, proto_converter),
234            PhysicalPlanType::HashJoin(hashjoin) => self
235                .try_into_hash_join_physical_plan(hashjoin, ctx, codec, proto_converter),
236            PhysicalPlanType::SymmetricHashJoin(sym_join) => self
237                .try_into_symmetric_hash_join_physical_plan(
238                    sym_join,
239                    ctx,
240                    codec,
241                    proto_converter,
242                ),
243            PhysicalPlanType::Union(union) => {
244                self.try_into_union_physical_plan(union, ctx, codec, proto_converter)
245            }
246            PhysicalPlanType::Interleave(interleave) => self
247                .try_into_interleave_physical_plan(
248                    interleave,
249                    ctx,
250                    codec,
251                    proto_converter,
252                ),
253            PhysicalPlanType::CrossJoin(crossjoin) => self
254                .try_into_cross_join_physical_plan(
255                    crossjoin,
256                    ctx,
257                    codec,
258                    proto_converter,
259                ),
260            PhysicalPlanType::Empty(empty) => {
261                self.try_into_empty_physical_plan(empty, ctx, codec, proto_converter)
262            }
263            PhysicalPlanType::PlaceholderRow(placeholder) => {
264                self.try_into_placeholder_row_physical_plan(placeholder, ctx, codec)
265            }
266            PhysicalPlanType::Sort(sort) => {
267                self.try_into_sort_physical_plan(sort, ctx, codec, proto_converter)
268            }
269            PhysicalPlanType::SortPreservingMerge(sort) => self
270                .try_into_sort_preserving_merge_physical_plan(
271                    sort,
272                    ctx,
273                    codec,
274                    proto_converter,
275                ),
276            PhysicalPlanType::Extension(extension) => self
277                .try_into_extension_physical_plan(extension, ctx, codec, proto_converter),
278            PhysicalPlanType::NestedLoopJoin(join) => self
279                .try_into_nested_loop_join_physical_plan(
280                    join,
281                    ctx,
282                    codec,
283                    proto_converter,
284                ),
285            PhysicalPlanType::Analyze(analyze) => {
286                self.try_into_analyze_physical_plan(analyze, ctx, codec, proto_converter)
287            }
288            PhysicalPlanType::JsonSink(sink) => {
289                self.try_into_json_sink_physical_plan(sink, ctx, codec, proto_converter)
290            }
291            PhysicalPlanType::CsvSink(sink) => {
292                self.try_into_csv_sink_physical_plan(sink, ctx, codec, proto_converter)
293            }
294            #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
295            PhysicalPlanType::ParquetSink(sink) => self
296                .try_into_parquet_sink_physical_plan(sink, ctx, codec, proto_converter),
297            PhysicalPlanType::Unnest(unnest) => {
298                self.try_into_unnest_physical_plan(unnest, ctx, codec, proto_converter)
299            }
300            PhysicalPlanType::Cooperative(cooperative) => self
301                .try_into_cooperative_physical_plan(
302                    cooperative,
303                    ctx,
304                    codec,
305                    proto_converter,
306                ),
307            PhysicalPlanType::GenerateSeries(generate_series) => {
308                self.try_into_generate_series_physical_plan(generate_series)
309            }
310            PhysicalPlanType::SortMergeJoin(sort_join) => {
311                self.try_into_sort_join(sort_join, ctx, codec, proto_converter)
312            }
313            PhysicalPlanType::AsyncFunc(async_func) => self
314                .try_into_async_func_physical_plan(
315                    async_func,
316                    ctx,
317                    codec,
318                    proto_converter,
319                ),
320            PhysicalPlanType::Buffer(buffer) => {
321                self.try_into_buffer_physical_plan(buffer, ctx, codec, proto_converter)
322            }
323        }
324    }
325
326    pub fn try_from_physical_plan_with_converter(
327        plan: Arc<dyn ExecutionPlan>,
328        codec: &dyn PhysicalExtensionCodec,
329        proto_converter: &dyn PhysicalProtoConverterExtension,
330    ) -> Result<Self>
331    where
332        Self: Sized,
333    {
334        let plan_clone = Arc::clone(&plan);
335        let plan = plan.as_any();
336
337        if let Some(exec) = plan.downcast_ref::<ExplainExec>() {
338            return protobuf::PhysicalPlanNode::try_from_explain_exec(exec, codec);
339        }
340
341        if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
342            return protobuf::PhysicalPlanNode::try_from_projection_exec(
343                exec,
344                codec,
345                proto_converter,
346            );
347        }
348
349        if let Some(exec) = plan.downcast_ref::<AnalyzeExec>() {
350            return protobuf::PhysicalPlanNode::try_from_analyze_exec(
351                exec,
352                codec,
353                proto_converter,
354            );
355        }
356
357        if let Some(exec) = plan.downcast_ref::<FilterExec>() {
358            return protobuf::PhysicalPlanNode::try_from_filter_exec(
359                exec,
360                codec,
361                proto_converter,
362            );
363        }
364
365        if let Some(limit) = plan.downcast_ref::<GlobalLimitExec>() {
366            return protobuf::PhysicalPlanNode::try_from_global_limit_exec(
367                limit,
368                codec,
369                proto_converter,
370            );
371        }
372
373        if let Some(limit) = plan.downcast_ref::<LocalLimitExec>() {
374            return protobuf::PhysicalPlanNode::try_from_local_limit_exec(
375                limit,
376                codec,
377                proto_converter,
378            );
379        }
380
381        if let Some(exec) = plan.downcast_ref::<HashJoinExec>() {
382            return protobuf::PhysicalPlanNode::try_from_hash_join_exec(
383                exec,
384                codec,
385                proto_converter,
386            );
387        }
388
389        if let Some(exec) = plan.downcast_ref::<SymmetricHashJoinExec>() {
390            return protobuf::PhysicalPlanNode::try_from_symmetric_hash_join_exec(
391                exec,
392                codec,
393                proto_converter,
394            );
395        }
396
397        if let Some(exec) = plan.downcast_ref::<SortMergeJoinExec>() {
398            return protobuf::PhysicalPlanNode::try_from_sort_merge_join_exec(
399                exec,
400                codec,
401                proto_converter,
402            );
403        }
404
405        if let Some(exec) = plan.downcast_ref::<CrossJoinExec>() {
406            return protobuf::PhysicalPlanNode::try_from_cross_join_exec(
407                exec,
408                codec,
409                proto_converter,
410            );
411        }
412
413        if let Some(exec) = plan.downcast_ref::<AggregateExec>() {
414            return protobuf::PhysicalPlanNode::try_from_aggregate_exec(
415                exec,
416                codec,
417                proto_converter,
418            );
419        }
420
421        if let Some(empty) = plan.downcast_ref::<EmptyExec>() {
422            return protobuf::PhysicalPlanNode::try_from_empty_exec(empty, codec);
423        }
424
425        if let Some(empty) = plan.downcast_ref::<PlaceholderRowExec>() {
426            return protobuf::PhysicalPlanNode::try_from_placeholder_row_exec(
427                empty, codec,
428            );
429        }
430
431        #[expect(deprecated)]
432        if let Some(coalesce_batches) = plan.downcast_ref::<CoalesceBatchesExec>() {
433            return protobuf::PhysicalPlanNode::try_from_coalesce_batches_exec(
434                coalesce_batches,
435                codec,
436                proto_converter,
437            );
438        }
439
440        if let Some(data_source_exec) = plan.downcast_ref::<DataSourceExec>()
441            && let Some(node) = protobuf::PhysicalPlanNode::try_from_data_source_exec(
442                data_source_exec,
443                codec,
444                proto_converter,
445            )?
446        {
447            return Ok(node);
448        }
449
450        if let Some(exec) = plan.downcast_ref::<CoalescePartitionsExec>() {
451            return protobuf::PhysicalPlanNode::try_from_coalesce_partitions_exec(
452                exec,
453                codec,
454                proto_converter,
455            );
456        }
457
458        if let Some(exec) = plan.downcast_ref::<RepartitionExec>() {
459            return protobuf::PhysicalPlanNode::try_from_repartition_exec(
460                exec,
461                codec,
462                proto_converter,
463            );
464        }
465
466        if let Some(exec) = plan.downcast_ref::<SortExec>() {
467            return protobuf::PhysicalPlanNode::try_from_sort_exec(
468                exec,
469                codec,
470                proto_converter,
471            );
472        }
473
474        if let Some(union) = plan.downcast_ref::<UnionExec>() {
475            return protobuf::PhysicalPlanNode::try_from_union_exec(
476                union,
477                codec,
478                proto_converter,
479            );
480        }
481
482        if let Some(interleave) = plan.downcast_ref::<InterleaveExec>() {
483            return protobuf::PhysicalPlanNode::try_from_interleave_exec(
484                interleave,
485                codec,
486                proto_converter,
487            );
488        }
489
490        if let Some(exec) = plan.downcast_ref::<SortPreservingMergeExec>() {
491            return protobuf::PhysicalPlanNode::try_from_sort_preserving_merge_exec(
492                exec,
493                codec,
494                proto_converter,
495            );
496        }
497
498        if let Some(exec) = plan.downcast_ref::<NestedLoopJoinExec>() {
499            return protobuf::PhysicalPlanNode::try_from_nested_loop_join_exec(
500                exec,
501                codec,
502                proto_converter,
503            );
504        }
505
506        if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
507            return protobuf::PhysicalPlanNode::try_from_window_agg_exec(
508                exec,
509                codec,
510                proto_converter,
511            );
512        }
513
514        if let Some(exec) = plan.downcast_ref::<BoundedWindowAggExec>() {
515            return protobuf::PhysicalPlanNode::try_from_bounded_window_agg_exec(
516                exec,
517                codec,
518                proto_converter,
519            );
520        }
521
522        if let Some(exec) = plan.downcast_ref::<DataSinkExec>()
523            && let Some(node) = protobuf::PhysicalPlanNode::try_from_data_sink_exec(
524                exec,
525                codec,
526                proto_converter,
527            )?
528        {
529            return Ok(node);
530        }
531
532        if let Some(exec) = plan.downcast_ref::<UnnestExec>() {
533            return protobuf::PhysicalPlanNode::try_from_unnest_exec(
534                exec,
535                codec,
536                proto_converter,
537            );
538        }
539
540        if let Some(exec) = plan.downcast_ref::<CooperativeExec>() {
541            return protobuf::PhysicalPlanNode::try_from_cooperative_exec(
542                exec,
543                codec,
544                proto_converter,
545            );
546        }
547
548        if let Some(exec) = plan.downcast_ref::<LazyMemoryExec>()
549            && let Some(node) =
550                protobuf::PhysicalPlanNode::try_from_lazy_memory_exec(exec)?
551        {
552            return Ok(node);
553        }
554
555        if let Some(exec) = plan.downcast_ref::<AsyncFuncExec>() {
556            return protobuf::PhysicalPlanNode::try_from_async_func_exec(
557                exec,
558                codec,
559                proto_converter,
560            );
561        }
562
563        if let Some(exec) = plan.downcast_ref::<BufferExec>() {
564            return protobuf::PhysicalPlanNode::try_from_buffer_exec(
565                exec,
566                codec,
567                proto_converter,
568            );
569        }
570
571        let mut buf: Vec<u8> = vec![];
572        match codec.try_encode(Arc::clone(&plan_clone), &mut buf) {
573            Ok(_) => {
574                let inputs: Vec<protobuf::PhysicalPlanNode> = plan_clone
575                    .children()
576                    .into_iter()
577                    .cloned()
578                    .map(|i| {
579                        protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
580                            i,
581                            codec,
582                            proto_converter,
583                        )
584                    })
585                    .collect::<Result<_>>()?;
586
587                Ok(protobuf::PhysicalPlanNode {
588                    physical_plan_type: Some(PhysicalPlanType::Extension(
589                        protobuf::PhysicalExtensionNode { node: buf, inputs },
590                    )),
591                })
592            }
593            Err(e) => internal_err!(
594                "Unsupported plan and extension codec failed with [{e}]. Plan: {plan_clone:?}"
595            ),
596        }
597    }
598}
599
600impl protobuf::PhysicalPlanNode {
601    fn try_into_explain_physical_plan(
602        &self,
603        explain: &protobuf::ExplainExecNode,
604        _ctx: &TaskContext,
605
606        _codec: &dyn PhysicalExtensionCodec,
607        _proto_converter: &dyn PhysicalProtoConverterExtension,
608    ) -> Result<Arc<dyn ExecutionPlan>> {
609        Ok(Arc::new(ExplainExec::new(
610            Arc::new(explain.schema.as_ref().unwrap().try_into()?),
611            explain
612                .stringified_plans
613                .iter()
614                .map(|plan| plan.into())
615                .collect(),
616            explain.verbose,
617        )))
618    }
619
620    fn try_into_projection_physical_plan(
621        &self,
622        projection: &protobuf::ProjectionExecNode,
623        ctx: &TaskContext,
624
625        codec: &dyn PhysicalExtensionCodec,
626        proto_converter: &dyn PhysicalProtoConverterExtension,
627    ) -> Result<Arc<dyn ExecutionPlan>> {
628        let input: Arc<dyn ExecutionPlan> =
629            into_physical_plan(&projection.input, ctx, codec, proto_converter)?;
630        let exprs = projection
631            .expr
632            .iter()
633            .zip(projection.expr_name.iter())
634            .map(|(expr, name)| {
635                Ok((
636                    proto_converter.proto_to_physical_expr(
637                        expr,
638                        ctx,
639                        input.schema().as_ref(),
640                        codec,
641                    )?,
642                    name.to_string(),
643                ))
644            })
645            .collect::<Result<Vec<(Arc<dyn PhysicalExpr>, String)>>>()?;
646        let proj_exprs: Vec<ProjectionExpr> = exprs
647            .into_iter()
648            .map(|(expr, alias)| ProjectionExpr { expr, alias })
649            .collect();
650        Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?))
651    }
652
653    fn try_into_filter_physical_plan(
654        &self,
655        filter: &protobuf::FilterExecNode,
656        ctx: &TaskContext,
657
658        codec: &dyn PhysicalExtensionCodec,
659        proto_converter: &dyn PhysicalProtoConverterExtension,
660    ) -> Result<Arc<dyn ExecutionPlan>> {
661        let input: Arc<dyn ExecutionPlan> =
662            into_physical_plan(&filter.input, ctx, codec, proto_converter)?;
663
664        let predicate = filter
665            .expr
666            .as_ref()
667            .map(|expr| {
668                proto_converter.proto_to_physical_expr(
669                    expr,
670                    ctx,
671                    input.schema().as_ref(),
672                    codec,
673                )
674            })
675            .transpose()?
676            .ok_or_else(|| {
677                internal_datafusion_err!(
678                    "filter (FilterExecNode) in PhysicalPlanNode is missing."
679                )
680            })?;
681
682        let filter_selectivity = filter.default_filter_selectivity.try_into();
683        let projection = if !filter.projection.is_empty() {
684            Some(
685                filter
686                    .projection
687                    .iter()
688                    .map(|i| *i as usize)
689                    .collect::<Vec<_>>(),
690            )
691        } else {
692            None
693        };
694
695        let filter = FilterExecBuilder::new(predicate, input)
696            .apply_projection(projection)?
697            .with_batch_size(filter.batch_size as usize)
698            .with_fetch(filter.fetch.map(|f| f as usize))
699            .build()?;
700        match filter_selectivity {
701            Ok(filter_selectivity) => Ok(Arc::new(
702                filter.with_default_selectivity(filter_selectivity)?,
703            )),
704            Err(_) => Err(internal_datafusion_err!(
705                "filter_selectivity in PhysicalPlanNode is invalid "
706            )),
707        }
708    }
709
710    fn try_into_csv_scan_physical_plan(
711        &self,
712        scan: &protobuf::CsvScanExecNode,
713        ctx: &TaskContext,
714
715        codec: &dyn PhysicalExtensionCodec,
716        proto_converter: &dyn PhysicalProtoConverterExtension,
717    ) -> Result<Arc<dyn ExecutionPlan>> {
718        let escape =
719            if let Some(protobuf::csv_scan_exec_node::OptionalEscape::Escape(escape)) =
720                &scan.optional_escape
721            {
722                Some(str_to_byte(escape, "escape")?)
723            } else {
724                None
725            };
726
727        let comment = if let Some(
728            protobuf::csv_scan_exec_node::OptionalComment::Comment(comment),
729        ) = &scan.optional_comment
730        {
731            Some(str_to_byte(comment, "comment")?)
732        } else {
733            None
734        };
735
736        // Parse table schema with partition columns
737        let table_schema =
738            parse_table_schema_from_proto(scan.base_conf.as_ref().unwrap())?;
739
740        let csv_options = CsvOptions {
741            has_header: Some(scan.has_header),
742            delimiter: str_to_byte(&scan.delimiter, "delimiter")?,
743            quote: str_to_byte(&scan.quote, "quote")?,
744            newlines_in_values: Some(scan.newlines_in_values),
745            ..Default::default()
746        };
747        let source = Arc::new(
748            CsvSource::new(table_schema)
749                .with_csv_options(csv_options)
750                .with_escape(escape)
751                .with_comment(comment),
752        );
753
754        let conf = FileScanConfigBuilder::from(parse_protobuf_file_scan_config(
755            scan.base_conf.as_ref().unwrap(),
756            ctx,
757            codec,
758            proto_converter,
759            source,
760        )?)
761        .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
762        .build();
763        Ok(DataSourceExec::from_data_source(conf))
764    }
765
766    fn try_into_json_scan_physical_plan(
767        &self,
768        scan: &protobuf::JsonScanExecNode,
769        ctx: &TaskContext,
770
771        codec: &dyn PhysicalExtensionCodec,
772        proto_converter: &dyn PhysicalProtoConverterExtension,
773    ) -> Result<Arc<dyn ExecutionPlan>> {
774        let base_conf = scan.base_conf.as_ref().unwrap();
775        let table_schema = parse_table_schema_from_proto(base_conf)?;
776        let scan_conf = parse_protobuf_file_scan_config(
777            base_conf,
778            ctx,
779            codec,
780            proto_converter,
781            Arc::new(JsonSource::new(table_schema)),
782        )?;
783        Ok(DataSourceExec::from_data_source(scan_conf))
784    }
785
786    fn try_into_arrow_scan_physical_plan(
787        &self,
788        scan: &protobuf::ArrowScanExecNode,
789        ctx: &TaskContext,
790        codec: &dyn PhysicalExtensionCodec,
791        proto_converter: &dyn PhysicalProtoConverterExtension,
792    ) -> Result<Arc<dyn ExecutionPlan>> {
793        let base_conf = scan.base_conf.as_ref().ok_or_else(|| {
794            internal_datafusion_err!("base_conf in ArrowScanExecNode is missing.")
795        })?;
796        let table_schema = parse_table_schema_from_proto(base_conf)?;
797        let scan_conf = parse_protobuf_file_scan_config(
798            base_conf,
799            ctx,
800            codec,
801            proto_converter,
802            Arc::new(ArrowSource::new_file_source(table_schema)),
803        )?;
804        Ok(DataSourceExec::from_data_source(scan_conf))
805    }
806
807    #[cfg_attr(not(feature = "parquet"), expect(unused_variables))]
808    fn try_into_parquet_scan_physical_plan(
809        &self,
810        scan: &protobuf::ParquetScanExecNode,
811        ctx: &TaskContext,
812        codec: &dyn PhysicalExtensionCodec,
813        proto_converter: &dyn PhysicalProtoConverterExtension,
814    ) -> Result<Arc<dyn ExecutionPlan>> {
815        #[cfg(feature = "parquet")]
816        {
817            let schema = from_proto::parse_protobuf_file_scan_schema(
818                scan.base_conf.as_ref().unwrap(),
819            )?;
820
821            // Check if there's a projection and use projected schema for predicate parsing
822            let base_conf = scan.base_conf.as_ref().unwrap();
823            let predicate_schema = if !base_conf.projection.is_empty() {
824                // Create projected schema for parsing the predicate
825                let projected_fields: Vec<_> = base_conf
826                    .projection
827                    .iter()
828                    .map(|&i| schema.field(i as usize).clone())
829                    .collect();
830                Arc::new(Schema::new(projected_fields))
831            } else {
832                schema
833            };
834
835            let predicate = scan
836                .predicate
837                .as_ref()
838                .map(|expr| {
839                    proto_converter.proto_to_physical_expr(
840                        expr,
841                        ctx,
842                        predicate_schema.as_ref(),
843                        codec,
844                    )
845                })
846                .transpose()?;
847            let mut options = datafusion_common::config::TableParquetOptions::default();
848
849            if let Some(table_options) = scan.parquet_options.as_ref() {
850                options = table_options.try_into()?;
851            }
852
853            // Parse table schema with partition columns
854            let table_schema = parse_table_schema_from_proto(base_conf)?;
855            let object_store_url = match base_conf.object_store_url.is_empty() {
856                false => ObjectStoreUrl::parse(&base_conf.object_store_url)?,
857                true => ObjectStoreUrl::local_filesystem(),
858            };
859            let store = ctx.runtime_env().object_store(object_store_url)?;
860            let metadata_cache =
861                ctx.runtime_env().cache_manager.get_file_metadata_cache();
862            let reader_factory =
863                Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache));
864
865            let mut source = ParquetSource::new(table_schema)
866                .with_parquet_file_reader_factory(reader_factory)
867                .with_table_parquet_options(options);
868
869            if let Some(predicate) = predicate {
870                source = source.with_predicate(predicate);
871            }
872            let base_config = parse_protobuf_file_scan_config(
873                base_conf,
874                ctx,
875                codec,
876                proto_converter,
877                Arc::new(source),
878            )?;
879            Ok(DataSourceExec::from_data_source(base_config))
880        }
881        #[cfg(not(feature = "parquet"))]
882        panic!(
883            "Unable to process a Parquet PhysicalPlan when `parquet` feature is not enabled"
884        )
885    }
886
887    #[cfg_attr(not(feature = "avro"), expect(unused_variables))]
888    fn try_into_avro_scan_physical_plan(
889        &self,
890        scan: &protobuf::AvroScanExecNode,
891        ctx: &TaskContext,
892        codec: &dyn PhysicalExtensionCodec,
893        proto_converter: &dyn PhysicalProtoConverterExtension,
894    ) -> Result<Arc<dyn ExecutionPlan>> {
895        #[cfg(feature = "avro")]
896        {
897            let table_schema =
898                parse_table_schema_from_proto(scan.base_conf.as_ref().unwrap())?;
899            let conf = parse_protobuf_file_scan_config(
900                scan.base_conf.as_ref().unwrap(),
901                ctx,
902                codec,
903                proto_converter,
904                Arc::new(AvroSource::new(table_schema)),
905            )?;
906            Ok(DataSourceExec::from_data_source(conf))
907        }
908
909        #[cfg(not(feature = "avro"))]
910        panic!("Unable to process a Avro PhysicalPlan when `avro` feature is not enabled")
911    }
912
913    fn try_into_memory_scan_physical_plan(
914        &self,
915        scan: &protobuf::MemoryScanExecNode,
916        ctx: &TaskContext,
917
918        codec: &dyn PhysicalExtensionCodec,
919        proto_converter: &dyn PhysicalProtoConverterExtension,
920    ) -> Result<Arc<dyn ExecutionPlan>> {
921        let partitions = scan
922            .partitions
923            .iter()
924            .map(|p| parse_record_batches(p))
925            .collect::<Result<Vec<_>>>()?;
926
927        let proto_schema = scan.schema.as_ref().ok_or_else(|| {
928            internal_datafusion_err!("schema in MemoryScanExecNode is missing.")
929        })?;
930        let schema: SchemaRef = SchemaRef::new(proto_schema.try_into()?);
931
932        let projection = if !scan.projection.is_empty() {
933            Some(
934                scan.projection
935                    .iter()
936                    .map(|i| *i as usize)
937                    .collect::<Vec<_>>(),
938            )
939        } else {
940            None
941        };
942
943        let mut sort_information = vec![];
944        for ordering in &scan.sort_information {
945            let sort_exprs = parse_physical_sort_exprs(
946                &ordering.physical_sort_expr_nodes,
947                ctx,
948                &schema,
949                codec,
950                proto_converter,
951            )?;
952            sort_information.extend(LexOrdering::new(sort_exprs));
953        }
954
955        let source = MemorySourceConfig::try_new(&partitions, schema, projection)?
956            .with_limit(scan.fetch.map(|f| f as usize))
957            .with_show_sizes(scan.show_sizes);
958
959        let source = source.try_with_sort_information(sort_information)?;
960
961        Ok(DataSourceExec::from_data_source(source))
962    }
963
964    fn try_into_coalesce_batches_physical_plan(
965        &self,
966        coalesce_batches: &protobuf::CoalesceBatchesExecNode,
967        ctx: &TaskContext,
968
969        codec: &dyn PhysicalExtensionCodec,
970        proto_converter: &dyn PhysicalProtoConverterExtension,
971    ) -> Result<Arc<dyn ExecutionPlan>> {
972        let input: Arc<dyn ExecutionPlan> =
973            into_physical_plan(&coalesce_batches.input, ctx, codec, proto_converter)?;
974        Ok(Arc::new(
975            #[expect(deprecated)]
976            CoalesceBatchesExec::new(input, coalesce_batches.target_batch_size as usize)
977                .with_fetch(coalesce_batches.fetch.map(|f| f as usize)),
978        ))
979    }
980
981    fn try_into_merge_physical_plan(
982        &self,
983        merge: &protobuf::CoalescePartitionsExecNode,
984        ctx: &TaskContext,
985
986        codec: &dyn PhysicalExtensionCodec,
987        proto_converter: &dyn PhysicalProtoConverterExtension,
988    ) -> Result<Arc<dyn ExecutionPlan>> {
989        let input: Arc<dyn ExecutionPlan> =
990            into_physical_plan(&merge.input, ctx, codec, proto_converter)?;
991        Ok(Arc::new(
992            CoalescePartitionsExec::new(input)
993                .with_fetch(merge.fetch.map(|f| f as usize)),
994        ))
995    }
996
997    fn try_into_repartition_physical_plan(
998        &self,
999        repart: &protobuf::RepartitionExecNode,
1000        ctx: &TaskContext,
1001
1002        codec: &dyn PhysicalExtensionCodec,
1003        proto_converter: &dyn PhysicalProtoConverterExtension,
1004    ) -> Result<Arc<dyn ExecutionPlan>> {
1005        let input: Arc<dyn ExecutionPlan> =
1006            into_physical_plan(&repart.input, ctx, codec, proto_converter)?;
1007        let partitioning = parse_protobuf_partitioning(
1008            repart.partitioning.as_ref(),
1009            ctx,
1010            input.schema().as_ref(),
1011            codec,
1012            proto_converter,
1013        )?;
1014        Ok(Arc::new(RepartitionExec::try_new(
1015            input,
1016            partitioning.unwrap(),
1017        )?))
1018    }
1019
1020    fn try_into_global_limit_physical_plan(
1021        &self,
1022        limit: &protobuf::GlobalLimitExecNode,
1023        ctx: &TaskContext,
1024
1025        codec: &dyn PhysicalExtensionCodec,
1026        proto_converter: &dyn PhysicalProtoConverterExtension,
1027    ) -> Result<Arc<dyn ExecutionPlan>> {
1028        let input: Arc<dyn ExecutionPlan> =
1029            into_physical_plan(&limit.input, ctx, codec, proto_converter)?;
1030        let fetch = if limit.fetch >= 0 {
1031            Some(limit.fetch as usize)
1032        } else {
1033            None
1034        };
1035        Ok(Arc::new(GlobalLimitExec::new(
1036            input,
1037            limit.skip as usize,
1038            fetch,
1039        )))
1040    }
1041
1042    fn try_into_local_limit_physical_plan(
1043        &self,
1044        limit: &protobuf::LocalLimitExecNode,
1045        ctx: &TaskContext,
1046
1047        codec: &dyn PhysicalExtensionCodec,
1048        proto_converter: &dyn PhysicalProtoConverterExtension,
1049    ) -> Result<Arc<dyn ExecutionPlan>> {
1050        let input: Arc<dyn ExecutionPlan> =
1051            into_physical_plan(&limit.input, ctx, codec, proto_converter)?;
1052        Ok(Arc::new(LocalLimitExec::new(input, limit.fetch as usize)))
1053    }
1054
1055    fn try_into_window_physical_plan(
1056        &self,
1057        window_agg: &protobuf::WindowAggExecNode,
1058        ctx: &TaskContext,
1059
1060        codec: &dyn PhysicalExtensionCodec,
1061        proto_converter: &dyn PhysicalProtoConverterExtension,
1062    ) -> Result<Arc<dyn ExecutionPlan>> {
1063        let input: Arc<dyn ExecutionPlan> =
1064            into_physical_plan(&window_agg.input, ctx, codec, proto_converter)?;
1065        let input_schema = input.schema();
1066
1067        let physical_window_expr: Vec<Arc<dyn WindowExpr>> = window_agg
1068            .window_expr
1069            .iter()
1070            .map(|window_expr| {
1071                parse_physical_window_expr(
1072                    window_expr,
1073                    ctx,
1074                    input_schema.as_ref(),
1075                    codec,
1076                    proto_converter,
1077                )
1078            })
1079            .collect::<Result<Vec<_>, _>>()?;
1080
1081        let partition_keys = window_agg
1082            .partition_keys
1083            .iter()
1084            .map(|expr| {
1085                proto_converter.proto_to_physical_expr(
1086                    expr,
1087                    ctx,
1088                    input.schema().as_ref(),
1089                    codec,
1090                )
1091            })
1092            .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
1093
1094        if let Some(input_order_mode) = window_agg.input_order_mode.as_ref() {
1095            let input_order_mode = match input_order_mode {
1096                window_agg_exec_node::InputOrderMode::Linear(_) => InputOrderMode::Linear,
1097                window_agg_exec_node::InputOrderMode::PartiallySorted(
1098                    protobuf::PartiallySortedInputOrderMode { columns },
1099                ) => InputOrderMode::PartiallySorted(
1100                    columns.iter().map(|c| *c as usize).collect(),
1101                ),
1102                window_agg_exec_node::InputOrderMode::Sorted(_) => InputOrderMode::Sorted,
1103            };
1104
1105            Ok(Arc::new(BoundedWindowAggExec::try_new(
1106                physical_window_expr,
1107                input,
1108                input_order_mode,
1109                !partition_keys.is_empty(),
1110            )?))
1111        } else {
1112            Ok(Arc::new(WindowAggExec::try_new(
1113                physical_window_expr,
1114                input,
1115                !partition_keys.is_empty(),
1116            )?))
1117        }
1118    }
1119
1120    fn try_into_aggregate_physical_plan(
1121        &self,
1122        hash_agg: &protobuf::AggregateExecNode,
1123        ctx: &TaskContext,
1124
1125        codec: &dyn PhysicalExtensionCodec,
1126        proto_converter: &dyn PhysicalProtoConverterExtension,
1127    ) -> Result<Arc<dyn ExecutionPlan>> {
1128        let input: Arc<dyn ExecutionPlan> =
1129            into_physical_plan(&hash_agg.input, ctx, codec, proto_converter)?;
1130        let mode = protobuf::AggregateMode::try_from(hash_agg.mode).map_err(|_| {
1131            proto_error(format!(
1132                "Received a AggregateNode message with unknown AggregateMode {}",
1133                hash_agg.mode
1134            ))
1135        })?;
1136        let agg_mode: AggregateMode = match mode {
1137            protobuf::AggregateMode::Partial => AggregateMode::Partial,
1138            protobuf::AggregateMode::Final => AggregateMode::Final,
1139            protobuf::AggregateMode::FinalPartitioned => AggregateMode::FinalPartitioned,
1140            protobuf::AggregateMode::Single => AggregateMode::Single,
1141            protobuf::AggregateMode::SinglePartitioned => {
1142                AggregateMode::SinglePartitioned
1143            }
1144            protobuf::AggregateMode::PartialReduce => AggregateMode::PartialReduce,
1145        };
1146
1147        let num_expr = hash_agg.group_expr.len();
1148
1149        let group_expr = hash_agg
1150            .group_expr
1151            .iter()
1152            .zip(hash_agg.group_expr_name.iter())
1153            .map(|(expr, name)| {
1154                proto_converter
1155                    .proto_to_physical_expr(expr, ctx, input.schema().as_ref(), codec)
1156                    .map(|expr| (expr, name.to_string()))
1157            })
1158            .collect::<Result<Vec<_>, _>>()?;
1159
1160        let null_expr = hash_agg
1161            .null_expr
1162            .iter()
1163            .zip(hash_agg.group_expr_name.iter())
1164            .map(|(expr, name)| {
1165                proto_converter
1166                    .proto_to_physical_expr(expr, ctx, input.schema().as_ref(), codec)
1167                    .map(|expr| (expr, name.to_string()))
1168            })
1169            .collect::<Result<Vec<_>, _>>()?;
1170
1171        let groups: Vec<Vec<bool>> = if !hash_agg.groups.is_empty() {
1172            hash_agg
1173                .groups
1174                .chunks(num_expr)
1175                .map(|g| g.to_vec())
1176                .collect::<Vec<Vec<bool>>>()
1177        } else {
1178            vec![]
1179        };
1180
1181        let has_grouping_set = hash_agg.has_grouping_set;
1182
1183        let input_schema = hash_agg.input_schema.as_ref().ok_or_else(|| {
1184            internal_datafusion_err!("input_schema in AggregateNode is missing.")
1185        })?;
1186        let physical_schema: SchemaRef = SchemaRef::new(input_schema.try_into()?);
1187
1188        let physical_filter_expr = hash_agg
1189            .filter_expr
1190            .iter()
1191            .map(|expr| {
1192                expr.expr
1193                    .as_ref()
1194                    .map(|e| {
1195                        proto_converter.proto_to_physical_expr(
1196                            e,
1197                            ctx,
1198                            &physical_schema,
1199                            codec,
1200                        )
1201                    })
1202                    .transpose()
1203            })
1204            .collect::<Result<Vec<_>, _>>()?;
1205
1206        let physical_aggr_expr: Vec<Arc<AggregateFunctionExpr>> = hash_agg
1207            .aggr_expr
1208            .iter()
1209            .zip(hash_agg.aggr_expr_name.iter())
1210            .map(|(expr, name)| {
1211                let expr_type = expr.expr_type.as_ref().ok_or_else(|| {
1212                    proto_error("Unexpected empty aggregate physical expression")
1213                })?;
1214
1215                match expr_type {
1216                    ExprType::AggregateExpr(agg_node) => {
1217                        let input_phy_expr: Vec<Arc<dyn PhysicalExpr>> = agg_node
1218                            .expr
1219                            .iter()
1220                            .map(|e| {
1221                                proto_converter.proto_to_physical_expr(
1222                                    e,
1223                                    ctx,
1224                                    &physical_schema,
1225                                    codec,
1226                                )
1227                            })
1228                            .collect::<Result<Vec<_>>>()?;
1229                        let order_bys = agg_node
1230                            .ordering_req
1231                            .iter()
1232                            .map(|e| {
1233                                parse_physical_sort_expr(
1234                                    e,
1235                                    ctx,
1236                                    &physical_schema,
1237                                    codec,
1238                                    proto_converter,
1239                                )
1240                            })
1241                            .collect::<Result<_>>()?;
1242                        agg_node
1243                            .aggregate_function
1244                            .as_ref()
1245                            .map(|func| match func {
1246                                AggregateFunction::UserDefinedAggrFunction(udaf_name) => {
1247                                    let agg_udf = match &agg_node.fun_definition {
1248                                        Some(buf) => {
1249                                            codec.try_decode_udaf(udaf_name, buf)?
1250                                        }
1251                                        None => ctx.udaf(udaf_name).or_else(|_| {
1252                                            codec.try_decode_udaf(udaf_name, &[])
1253                                        })?,
1254                                    };
1255
1256                                    AggregateExprBuilder::new(agg_udf, input_phy_expr)
1257                                        .schema(Arc::clone(&physical_schema))
1258                                        .alias(name)
1259                                        .human_display(agg_node.human_display.clone())
1260                                        .with_ignore_nulls(agg_node.ignore_nulls)
1261                                        .with_distinct(agg_node.distinct)
1262                                        .order_by(order_bys)
1263                                        .build()
1264                                        .map(Arc::new)
1265                                }
1266                            })
1267                            .transpose()?
1268                            .ok_or_else(|| {
1269                                proto_error(
1270                                    "Invalid AggregateExpr, missing aggregate_function",
1271                                )
1272                            })
1273                    }
1274                    _ => internal_err!("Invalid aggregate expression for AggregateExec"),
1275                }
1276            })
1277            .collect::<Result<Vec<_>, _>>()?;
1278
1279        let agg = AggregateExec::try_new(
1280            agg_mode,
1281            PhysicalGroupBy::new(group_expr, null_expr, groups, has_grouping_set),
1282            physical_aggr_expr,
1283            physical_filter_expr,
1284            input,
1285            physical_schema,
1286        )?;
1287
1288        let agg = if let Some(limit_proto) = &hash_agg.limit {
1289            let limit = limit_proto.limit as usize;
1290            let limit_options = match limit_proto.descending {
1291                Some(descending) => LimitOptions::new_with_order(limit, descending),
1292                None => LimitOptions::new(limit),
1293            };
1294            agg.with_limit_options(Some(limit_options))
1295        } else {
1296            agg
1297        };
1298
1299        Ok(Arc::new(agg))
1300    }
1301
1302    fn try_into_hash_join_physical_plan(
1303        &self,
1304        hashjoin: &protobuf::HashJoinExecNode,
1305        ctx: &TaskContext,
1306
1307        codec: &dyn PhysicalExtensionCodec,
1308        proto_converter: &dyn PhysicalProtoConverterExtension,
1309    ) -> Result<Arc<dyn ExecutionPlan>> {
1310        let left: Arc<dyn ExecutionPlan> =
1311            into_physical_plan(&hashjoin.left, ctx, codec, proto_converter)?;
1312        let right: Arc<dyn ExecutionPlan> =
1313            into_physical_plan(&hashjoin.right, ctx, codec, proto_converter)?;
1314        let left_schema = left.schema();
1315        let right_schema = right.schema();
1316        let on: Vec<(PhysicalExprRef, PhysicalExprRef)> = hashjoin
1317            .on
1318            .iter()
1319            .map(|col| {
1320                let left = proto_converter.proto_to_physical_expr(
1321                    &col.left.clone().unwrap(),
1322                    ctx,
1323                    left_schema.as_ref(),
1324                    codec,
1325                )?;
1326                let right = proto_converter.proto_to_physical_expr(
1327                    &col.right.clone().unwrap(),
1328                    ctx,
1329                    right_schema.as_ref(),
1330                    codec,
1331                )?;
1332                Ok((left, right))
1333            })
1334            .collect::<Result<_>>()?;
1335        let join_type =
1336            protobuf::JoinType::try_from(hashjoin.join_type).map_err(|_| {
1337                proto_error(format!(
1338                    "Received a HashJoinNode message with unknown JoinType {}",
1339                    hashjoin.join_type
1340                ))
1341            })?;
1342        let null_equality = protobuf::NullEquality::try_from(hashjoin.null_equality)
1343            .map_err(|_| {
1344                proto_error(format!(
1345                    "Received a HashJoinNode message with unknown NullEquality {}",
1346                    hashjoin.null_equality
1347                ))
1348            })?;
1349        let filter = hashjoin
1350            .filter
1351            .as_ref()
1352            .map(|f| {
1353                let schema = f
1354                    .schema
1355                    .as_ref()
1356                    .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1357                    .try_into()?;
1358
1359                let expression = proto_converter.proto_to_physical_expr(
1360                    f.expression.as_ref().ok_or_else(|| {
1361                        proto_error("Unexpected empty filter expression")
1362                    })?,
1363                    ctx, &schema,
1364                    codec,
1365                )?;
1366                let column_indices = f.column_indices
1367                    .iter()
1368                    .map(|i| {
1369                        let side = protobuf::JoinSide::try_from(i.side)
1370                            .map_err(|_| proto_error(format!(
1371                                "Received a HashJoinNode message with JoinSide in Filter {}",
1372                                i.side))
1373                            )?;
1374
1375                        Ok(ColumnIndex {
1376                            index: i.index as usize,
1377                            side: side.into(),
1378                        })
1379                    })
1380                    .collect::<Result<Vec<_>>>()?;
1381
1382                Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1383            })
1384            .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1385
1386        let partition_mode = protobuf::PartitionMode::try_from(hashjoin.partition_mode)
1387            .map_err(|_| {
1388            proto_error(format!(
1389                "Received a HashJoinNode message with unknown PartitionMode {}",
1390                hashjoin.partition_mode
1391            ))
1392        })?;
1393        let partition_mode = match partition_mode {
1394            protobuf::PartitionMode::CollectLeft => PartitionMode::CollectLeft,
1395            protobuf::PartitionMode::Partitioned => PartitionMode::Partitioned,
1396            protobuf::PartitionMode::Auto => PartitionMode::Auto,
1397        };
1398        let projection = if !hashjoin.projection.is_empty() {
1399            Some(
1400                hashjoin
1401                    .projection
1402                    .iter()
1403                    .map(|i| *i as usize)
1404                    .collect::<Vec<_>>(),
1405            )
1406        } else {
1407            None
1408        };
1409        Ok(Arc::new(HashJoinExec::try_new(
1410            left,
1411            right,
1412            on,
1413            filter,
1414            &join_type.into(),
1415            projection,
1416            partition_mode,
1417            null_equality.into(),
1418            hashjoin.null_aware,
1419        )?))
1420    }
1421
1422    fn try_into_symmetric_hash_join_physical_plan(
1423        &self,
1424        sym_join: &protobuf::SymmetricHashJoinExecNode,
1425        ctx: &TaskContext,
1426
1427        codec: &dyn PhysicalExtensionCodec,
1428        proto_converter: &dyn PhysicalProtoConverterExtension,
1429    ) -> Result<Arc<dyn ExecutionPlan>> {
1430        let left = into_physical_plan(&sym_join.left, ctx, codec, proto_converter)?;
1431        let right = into_physical_plan(&sym_join.right, ctx, codec, proto_converter)?;
1432        let left_schema = left.schema();
1433        let right_schema = right.schema();
1434        let on = sym_join
1435            .on
1436            .iter()
1437            .map(|col| {
1438                let left = proto_converter.proto_to_physical_expr(
1439                    &col.left.clone().unwrap(),
1440                    ctx,
1441                    left_schema.as_ref(),
1442                    codec,
1443                )?;
1444                let right = proto_converter.proto_to_physical_expr(
1445                    &col.right.clone().unwrap(),
1446                    ctx,
1447                    right_schema.as_ref(),
1448                    codec,
1449                )?;
1450                Ok((left, right))
1451            })
1452            .collect::<Result<_>>()?;
1453        let join_type =
1454            protobuf::JoinType::try_from(sym_join.join_type).map_err(|_| {
1455                proto_error(format!(
1456                    "Received a SymmetricHashJoin message with unknown JoinType {}",
1457                    sym_join.join_type
1458                ))
1459            })?;
1460        let null_equality = protobuf::NullEquality::try_from(sym_join.null_equality)
1461            .map_err(|_| {
1462                proto_error(format!(
1463                    "Received a SymmetricHashJoin message with unknown NullEquality {}",
1464                    sym_join.null_equality
1465                ))
1466            })?;
1467        let filter = sym_join
1468            .filter
1469            .as_ref()
1470            .map(|f| {
1471                let schema = f
1472                    .schema
1473                    .as_ref()
1474                    .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1475                    .try_into()?;
1476
1477                let expression = proto_converter.proto_to_physical_expr(
1478                    f.expression.as_ref().ok_or_else(|| {
1479                        proto_error("Unexpected empty filter expression")
1480                    })?,
1481                    ctx, &schema,
1482                    codec,
1483                )?;
1484                let column_indices = f.column_indices
1485                    .iter()
1486                    .map(|i| {
1487                        let side = protobuf::JoinSide::try_from(i.side)
1488                            .map_err(|_| proto_error(format!(
1489                                "Received a HashJoinNode message with JoinSide in Filter {}",
1490                                i.side))
1491                            )?;
1492
1493                        Ok(ColumnIndex {
1494                            index: i.index as usize,
1495                            side: side.into(),
1496                        })
1497                    })
1498                    .collect::<Result<_>>()?;
1499
1500                Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1501            })
1502            .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1503
1504        let left_sort_exprs = parse_physical_sort_exprs(
1505            &sym_join.left_sort_exprs,
1506            ctx,
1507            &left_schema,
1508            codec,
1509            proto_converter,
1510        )?;
1511        let left_sort_exprs = LexOrdering::new(left_sort_exprs);
1512
1513        let right_sort_exprs = parse_physical_sort_exprs(
1514            &sym_join.right_sort_exprs,
1515            ctx,
1516            &right_schema,
1517            codec,
1518            proto_converter,
1519        )?;
1520        let right_sort_exprs = LexOrdering::new(right_sort_exprs);
1521
1522        let partition_mode = protobuf::StreamPartitionMode::try_from(
1523            sym_join.partition_mode,
1524        )
1525        .map_err(|_| {
1526            proto_error(format!(
1527                "Received a SymmetricHashJoin message with unknown PartitionMode {}",
1528                sym_join.partition_mode
1529            ))
1530        })?;
1531        let partition_mode = match partition_mode {
1532            protobuf::StreamPartitionMode::SinglePartition => {
1533                StreamJoinPartitionMode::SinglePartition
1534            }
1535            protobuf::StreamPartitionMode::PartitionedExec => {
1536                StreamJoinPartitionMode::Partitioned
1537            }
1538        };
1539        SymmetricHashJoinExec::try_new(
1540            left,
1541            right,
1542            on,
1543            filter,
1544            &join_type.into(),
1545            null_equality.into(),
1546            left_sort_exprs,
1547            right_sort_exprs,
1548            partition_mode,
1549        )
1550        .map(|e| Arc::new(e) as _)
1551    }
1552
1553    fn try_into_union_physical_plan(
1554        &self,
1555        union: &protobuf::UnionExecNode,
1556        ctx: &TaskContext,
1557
1558        codec: &dyn PhysicalExtensionCodec,
1559        proto_converter: &dyn PhysicalProtoConverterExtension,
1560    ) -> Result<Arc<dyn ExecutionPlan>> {
1561        let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1562        for input in &union.inputs {
1563            inputs.push(proto_converter.proto_to_execution_plan(ctx, codec, input)?);
1564        }
1565        UnionExec::try_new(inputs)
1566    }
1567
1568    fn try_into_interleave_physical_plan(
1569        &self,
1570        interleave: &protobuf::InterleaveExecNode,
1571        ctx: &TaskContext,
1572
1573        codec: &dyn PhysicalExtensionCodec,
1574        proto_converter: &dyn PhysicalProtoConverterExtension,
1575    ) -> Result<Arc<dyn ExecutionPlan>> {
1576        let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1577        for input in &interleave.inputs {
1578            inputs.push(proto_converter.proto_to_execution_plan(ctx, codec, input)?);
1579        }
1580        Ok(Arc::new(InterleaveExec::try_new(inputs)?))
1581    }
1582
1583    fn try_into_cross_join_physical_plan(
1584        &self,
1585        crossjoin: &protobuf::CrossJoinExecNode,
1586        ctx: &TaskContext,
1587
1588        codec: &dyn PhysicalExtensionCodec,
1589        proto_converter: &dyn PhysicalProtoConverterExtension,
1590    ) -> Result<Arc<dyn ExecutionPlan>> {
1591        let left: Arc<dyn ExecutionPlan> =
1592            into_physical_plan(&crossjoin.left, ctx, codec, proto_converter)?;
1593        let right: Arc<dyn ExecutionPlan> =
1594            into_physical_plan(&crossjoin.right, ctx, codec, proto_converter)?;
1595        Ok(Arc::new(CrossJoinExec::new(left, right)))
1596    }
1597
1598    fn try_into_empty_physical_plan(
1599        &self,
1600        empty: &protobuf::EmptyExecNode,
1601        _ctx: &TaskContext,
1602
1603        _codec: &dyn PhysicalExtensionCodec,
1604        _proto_converter: &dyn PhysicalProtoConverterExtension,
1605    ) -> Result<Arc<dyn ExecutionPlan>> {
1606        let schema = Arc::new(convert_required!(empty.schema)?);
1607        Ok(Arc::new(EmptyExec::new(schema)))
1608    }
1609
1610    fn try_into_placeholder_row_physical_plan(
1611        &self,
1612        placeholder: &protobuf::PlaceholderRowExecNode,
1613        _ctx: &TaskContext,
1614
1615        _codec: &dyn PhysicalExtensionCodec,
1616    ) -> Result<Arc<dyn ExecutionPlan>> {
1617        let schema = Arc::new(convert_required!(placeholder.schema)?);
1618        Ok(Arc::new(PlaceholderRowExec::new(schema)))
1619    }
1620
1621    fn try_into_sort_physical_plan(
1622        &self,
1623        sort: &protobuf::SortExecNode,
1624        ctx: &TaskContext,
1625
1626        codec: &dyn PhysicalExtensionCodec,
1627        proto_converter: &dyn PhysicalProtoConverterExtension,
1628    ) -> Result<Arc<dyn ExecutionPlan>> {
1629        let input = into_physical_plan(&sort.input, ctx, codec, proto_converter)?;
1630        let exprs = sort
1631            .expr
1632            .iter()
1633            .map(|expr| {
1634                let expr = expr.expr_type.as_ref().ok_or_else(|| {
1635                    proto_error(format!(
1636                        "physical_plan::from_proto() Unexpected expr {self:?}"
1637                    ))
1638                })?;
1639                if let ExprType::Sort(sort_expr) = expr {
1640                    let expr = sort_expr
1641                        .expr
1642                        .as_ref()
1643                        .ok_or_else(|| {
1644                            proto_error(format!(
1645                                "physical_plan::from_proto() Unexpected sort expr {self:?}"
1646                            ))
1647                        })?
1648                        .as_ref();
1649                    Ok(PhysicalSortExpr {
1650                        expr: proto_converter.proto_to_physical_expr(expr, ctx, input.schema().as_ref(), codec)?,
1651                        options: SortOptions {
1652                            descending: !sort_expr.asc,
1653                            nulls_first: sort_expr.nulls_first,
1654                        },
1655                    })
1656                } else {
1657                    internal_err!(
1658                        "physical_plan::from_proto() {self:?}"
1659                    )
1660                }
1661            })
1662            .collect::<Result<Vec<_>>>()?;
1663        let Some(ordering) = LexOrdering::new(exprs) else {
1664            return internal_err!("SortExec requires an ordering");
1665        };
1666        let fetch = (sort.fetch >= 0).then_some(sort.fetch as _);
1667        let new_sort = SortExec::new(ordering, input)
1668            .with_fetch(fetch)
1669            .with_preserve_partitioning(sort.preserve_partitioning);
1670
1671        Ok(Arc::new(new_sort))
1672    }
1673
1674    fn try_into_sort_preserving_merge_physical_plan(
1675        &self,
1676        sort: &protobuf::SortPreservingMergeExecNode,
1677        ctx: &TaskContext,
1678
1679        codec: &dyn PhysicalExtensionCodec,
1680        proto_converter: &dyn PhysicalProtoConverterExtension,
1681    ) -> Result<Arc<dyn ExecutionPlan>> {
1682        let input = into_physical_plan(&sort.input, ctx, codec, proto_converter)?;
1683        let exprs = sort
1684            .expr
1685            .iter()
1686            .map(|expr| {
1687                let expr = expr.expr_type.as_ref().ok_or_else(|| {
1688                    proto_error(format!(
1689                        "physical_plan::from_proto() Unexpected expr {self:?}"
1690                    ))
1691                })?;
1692                if let ExprType::Sort(sort_expr) = expr {
1693                    let expr = sort_expr
1694                        .expr
1695                        .as_ref()
1696                        .ok_or_else(|| {
1697                            proto_error(format!(
1698                            "physical_plan::from_proto() Unexpected sort expr {self:?}"
1699                        ))
1700                        })?
1701                        .as_ref();
1702                    Ok(PhysicalSortExpr {
1703                        expr: proto_converter.proto_to_physical_expr(
1704                            expr,
1705                            ctx,
1706                            input.schema().as_ref(),
1707                            codec,
1708                        )?,
1709                        options: SortOptions {
1710                            descending: !sort_expr.asc,
1711                            nulls_first: sort_expr.nulls_first,
1712                        },
1713                    })
1714                } else {
1715                    internal_err!("physical_plan::from_proto() {self:?}")
1716                }
1717            })
1718            .collect::<Result<Vec<_>>>()?;
1719        let Some(ordering) = LexOrdering::new(exprs) else {
1720            return internal_err!("SortExec requires an ordering");
1721        };
1722        let fetch = (sort.fetch >= 0).then_some(sort.fetch as _);
1723        Ok(Arc::new(
1724            SortPreservingMergeExec::new(ordering, input).with_fetch(fetch),
1725        ))
1726    }
1727
1728    fn try_into_extension_physical_plan(
1729        &self,
1730        extension: &protobuf::PhysicalExtensionNode,
1731        ctx: &TaskContext,
1732
1733        codec: &dyn PhysicalExtensionCodec,
1734        proto_converter: &dyn PhysicalProtoConverterExtension,
1735    ) -> Result<Arc<dyn ExecutionPlan>> {
1736        let inputs: Vec<Arc<dyn ExecutionPlan>> = extension
1737            .inputs
1738            .iter()
1739            .map(|i| proto_converter.proto_to_execution_plan(ctx, codec, i))
1740            .collect::<Result<_>>()?;
1741
1742        let extension_node = codec.try_decode(extension.node.as_slice(), &inputs, ctx)?;
1743
1744        Ok(extension_node)
1745    }
1746
1747    fn try_into_nested_loop_join_physical_plan(
1748        &self,
1749        join: &protobuf::NestedLoopJoinExecNode,
1750        ctx: &TaskContext,
1751
1752        codec: &dyn PhysicalExtensionCodec,
1753        proto_converter: &dyn PhysicalProtoConverterExtension,
1754    ) -> Result<Arc<dyn ExecutionPlan>> {
1755        let left: Arc<dyn ExecutionPlan> =
1756            into_physical_plan(&join.left, ctx, codec, proto_converter)?;
1757        let right: Arc<dyn ExecutionPlan> =
1758            into_physical_plan(&join.right, ctx, codec, proto_converter)?;
1759        let join_type = protobuf::JoinType::try_from(join.join_type).map_err(|_| {
1760            proto_error(format!(
1761                "Received a NestedLoopJoinExecNode message with unknown JoinType {}",
1762                join.join_type
1763            ))
1764        })?;
1765        let filter = join
1766                    .filter
1767                    .as_ref()
1768                    .map(|f| {
1769                        let schema = f
1770                            .schema
1771                            .as_ref()
1772                            .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1773                            .try_into()?;
1774
1775                        let expression = proto_converter.proto_to_physical_expr(
1776                            f.expression.as_ref().ok_or_else(|| {
1777                                proto_error("Unexpected empty filter expression")
1778                            })?,
1779                            ctx, &schema,
1780                            codec,
1781                        )?;
1782                        let column_indices = f.column_indices
1783                            .iter()
1784                            .map(|i| {
1785                                let side = protobuf::JoinSide::try_from(i.side)
1786                                    .map_err(|_| proto_error(format!(
1787                                        "Received a NestedLoopJoinExecNode message with JoinSide in Filter {}",
1788                                        i.side))
1789                                    )?;
1790
1791                                Ok(ColumnIndex {
1792                                    index: i.index as usize,
1793                                    side: side.into(),
1794                                })
1795                            })
1796                            .collect::<Result<Vec<_>>>()?;
1797
1798                        Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1799                    })
1800                    .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1801
1802        let projection = if !join.projection.is_empty() {
1803            Some(
1804                join.projection
1805                    .iter()
1806                    .map(|i| *i as usize)
1807                    .collect::<Vec<_>>(),
1808            )
1809        } else {
1810            None
1811        };
1812
1813        Ok(Arc::new(NestedLoopJoinExec::try_new(
1814            left,
1815            right,
1816            filter,
1817            &join_type.into(),
1818            projection,
1819        )?))
1820    }
1821
1822    fn try_into_analyze_physical_plan(
1823        &self,
1824        analyze: &protobuf::AnalyzeExecNode,
1825        ctx: &TaskContext,
1826
1827        codec: &dyn PhysicalExtensionCodec,
1828        proto_converter: &dyn PhysicalProtoConverterExtension,
1829    ) -> Result<Arc<dyn ExecutionPlan>> {
1830        let input: Arc<dyn ExecutionPlan> =
1831            into_physical_plan(&analyze.input, ctx, codec, proto_converter)?;
1832        Ok(Arc::new(AnalyzeExec::new(
1833            analyze.verbose,
1834            analyze.show_statistics,
1835            vec![MetricType::SUMMARY, MetricType::DEV],
1836            input,
1837            Arc::new(convert_required!(analyze.schema)?),
1838        )))
1839    }
1840
1841    fn try_into_json_sink_physical_plan(
1842        &self,
1843        sink: &protobuf::JsonSinkExecNode,
1844        ctx: &TaskContext,
1845
1846        codec: &dyn PhysicalExtensionCodec,
1847        proto_converter: &dyn PhysicalProtoConverterExtension,
1848    ) -> Result<Arc<dyn ExecutionPlan>> {
1849        let input = into_physical_plan(&sink.input, ctx, codec, proto_converter)?;
1850
1851        let data_sink: JsonSink = sink
1852            .sink
1853            .as_ref()
1854            .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1855            .try_into()?;
1856        let sink_schema = input.schema();
1857        let sort_order = sink
1858            .sort_order
1859            .as_ref()
1860            .map(|collection| {
1861                parse_physical_sort_exprs(
1862                    &collection.physical_sort_expr_nodes,
1863                    ctx,
1864                    &sink_schema,
1865                    codec,
1866                    proto_converter,
1867                )
1868                .map(|sort_exprs| {
1869                    LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1870                })
1871            })
1872            .transpose()?
1873            .flatten();
1874        Ok(Arc::new(DataSinkExec::new(
1875            input,
1876            Arc::new(data_sink),
1877            sort_order,
1878        )))
1879    }
1880
1881    fn try_into_csv_sink_physical_plan(
1882        &self,
1883        sink: &protobuf::CsvSinkExecNode,
1884        ctx: &TaskContext,
1885
1886        codec: &dyn PhysicalExtensionCodec,
1887        proto_converter: &dyn PhysicalProtoConverterExtension,
1888    ) -> Result<Arc<dyn ExecutionPlan>> {
1889        let input = into_physical_plan(&sink.input, ctx, codec, proto_converter)?;
1890
1891        let data_sink: CsvSink = sink
1892            .sink
1893            .as_ref()
1894            .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1895            .try_into()?;
1896        let sink_schema = input.schema();
1897        let sort_order = sink
1898            .sort_order
1899            .as_ref()
1900            .map(|collection| {
1901                parse_physical_sort_exprs(
1902                    &collection.physical_sort_expr_nodes,
1903                    ctx,
1904                    &sink_schema,
1905                    codec,
1906                    proto_converter,
1907                )
1908                .map(|sort_exprs| {
1909                    LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1910                })
1911            })
1912            .transpose()?
1913            .flatten();
1914        Ok(Arc::new(DataSinkExec::new(
1915            input,
1916            Arc::new(data_sink),
1917            sort_order,
1918        )))
1919    }
1920
1921    #[cfg_attr(not(feature = "parquet"), expect(unused_variables))]
1922    fn try_into_parquet_sink_physical_plan(
1923        &self,
1924        sink: &protobuf::ParquetSinkExecNode,
1925        ctx: &TaskContext,
1926
1927        codec: &dyn PhysicalExtensionCodec,
1928        proto_converter: &dyn PhysicalProtoConverterExtension,
1929    ) -> Result<Arc<dyn ExecutionPlan>> {
1930        #[cfg(feature = "parquet")]
1931        {
1932            let input = into_physical_plan(&sink.input, ctx, codec, proto_converter)?;
1933
1934            let data_sink: ParquetSink = sink
1935                .sink
1936                .as_ref()
1937                .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1938                .try_into()?;
1939            let sink_schema = input.schema();
1940            let sort_order = sink
1941                .sort_order
1942                .as_ref()
1943                .map(|collection| {
1944                    parse_physical_sort_exprs(
1945                        &collection.physical_sort_expr_nodes,
1946                        ctx,
1947                        &sink_schema,
1948                        codec,
1949                        proto_converter,
1950                    )
1951                    .map(|sort_exprs| {
1952                        LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1953                    })
1954                })
1955                .transpose()?
1956                .flatten();
1957            Ok(Arc::new(DataSinkExec::new(
1958                input,
1959                Arc::new(data_sink),
1960                sort_order,
1961            )))
1962        }
1963        #[cfg(not(feature = "parquet"))]
1964        panic!("Trying to use ParquetSink without `parquet` feature enabled");
1965    }
1966
1967    fn try_into_unnest_physical_plan(
1968        &self,
1969        unnest: &protobuf::UnnestExecNode,
1970        ctx: &TaskContext,
1971
1972        codec: &dyn PhysicalExtensionCodec,
1973        proto_converter: &dyn PhysicalProtoConverterExtension,
1974    ) -> Result<Arc<dyn ExecutionPlan>> {
1975        let input = into_physical_plan(&unnest.input, ctx, codec, proto_converter)?;
1976
1977        Ok(Arc::new(UnnestExec::new(
1978            input,
1979            unnest
1980                .list_type_columns
1981                .iter()
1982                .map(|c| ListUnnest {
1983                    index_in_input_schema: c.index_in_input_schema as _,
1984                    depth: c.depth as _,
1985                })
1986                .collect(),
1987            unnest.struct_type_columns.iter().map(|c| *c as _).collect(),
1988            Arc::new(convert_required!(unnest.schema)?),
1989            into_required!(unnest.options)?,
1990        )?))
1991    }
1992
1993    fn generate_series_name_to_str(name: protobuf::GenerateSeriesName) -> &'static str {
1994        match name {
1995            protobuf::GenerateSeriesName::GsGenerateSeries => "generate_series",
1996            protobuf::GenerateSeriesName::GsRange => "range",
1997        }
1998    }
1999    fn try_into_sort_join(
2000        &self,
2001        sort_join: &SortMergeJoinExecNode,
2002        ctx: &TaskContext,
2003
2004        codec: &dyn PhysicalExtensionCodec,
2005        proto_converter: &dyn PhysicalProtoConverterExtension,
2006    ) -> Result<Arc<dyn ExecutionPlan>> {
2007        let left = into_physical_plan(&sort_join.left, ctx, codec, proto_converter)?;
2008        let left_schema = left.schema();
2009        let right = into_physical_plan(&sort_join.right, ctx, codec, proto_converter)?;
2010        let right_schema = right.schema();
2011
2012        let filter = sort_join
2013            .filter
2014            .as_ref()
2015            .map(|f| {
2016                let schema = f
2017                    .schema
2018                    .as_ref()
2019                    .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
2020                    .try_into()?;
2021
2022                let expression = proto_converter.proto_to_physical_expr(
2023                    f.expression.as_ref().ok_or_else(|| {
2024                        proto_error("Unexpected empty filter expression")
2025                    })?,
2026                    ctx,
2027                    &schema,
2028                    codec,
2029                )?;
2030                let column_indices = f
2031                    .column_indices
2032                    .iter()
2033                    .map(|i| {
2034                        let side =
2035                            protobuf::JoinSide::try_from(i.side).map_err(|_| {
2036                                proto_error(format!(
2037                                    "Received a SortMergeJoinExecNode message with JoinSide in Filter {}",
2038                                    i.side
2039                                ))
2040                            })?;
2041
2042                        Ok(ColumnIndex {
2043                            index: i.index as usize,
2044                            side: side.into(),
2045                        })
2046                    })
2047                    .collect::<Result<Vec<_>>>()?;
2048
2049                Ok(JoinFilter::new(
2050                    expression,
2051                    column_indices,
2052                    Arc::new(schema),
2053                ))
2054            })
2055            .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
2056
2057        let join_type =
2058            protobuf::JoinType::try_from(sort_join.join_type).map_err(|_| {
2059                proto_error(format!(
2060                    "Received a SortMergeJoinExecNode message with unknown JoinType {}",
2061                    sort_join.join_type
2062                ))
2063            })?;
2064
2065        let null_equality = protobuf::NullEquality::try_from(sort_join.null_equality)
2066            .map_err(|_| {
2067                proto_error(format!(
2068                    "Received a SortMergeJoinExecNode message with unknown NullEquality {}",
2069                    sort_join.null_equality
2070                ))
2071            })?;
2072
2073        let sort_options = sort_join
2074            .sort_options
2075            .iter()
2076            .map(|e| SortOptions {
2077                descending: !e.asc,
2078                nulls_first: e.nulls_first,
2079            })
2080            .collect();
2081        let on = sort_join
2082            .on
2083            .iter()
2084            .map(|col| {
2085                let left = proto_converter.proto_to_physical_expr(
2086                    &col.left.clone().unwrap(),
2087                    ctx,
2088                    left_schema.as_ref(),
2089                    codec,
2090                )?;
2091                let right = proto_converter.proto_to_physical_expr(
2092                    &col.right.clone().unwrap(),
2093                    ctx,
2094                    right_schema.as_ref(),
2095                    codec,
2096                )?;
2097                Ok((left, right))
2098            })
2099            .collect::<Result<_>>()?;
2100
2101        Ok(Arc::new(SortMergeJoinExec::try_new(
2102            left,
2103            right,
2104            on,
2105            filter,
2106            join_type.into(),
2107            sort_options,
2108            null_equality.into(),
2109        )?))
2110    }
2111
2112    fn try_into_generate_series_physical_plan(
2113        &self,
2114        generate_series: &protobuf::GenerateSeriesNode,
2115    ) -> Result<Arc<dyn ExecutionPlan>> {
2116        let schema: SchemaRef = Arc::new(convert_required!(generate_series.schema)?);
2117
2118        let args = match &generate_series.args {
2119            Some(protobuf::generate_series_node::Args::ContainsNull(args)) => {
2120                GenSeriesArgs::ContainsNull {
2121                    name: Self::generate_series_name_to_str(args.name()),
2122                }
2123            }
2124            Some(protobuf::generate_series_node::Args::Int64Args(args)) => {
2125                GenSeriesArgs::Int64Args {
2126                    start: args.start,
2127                    end: args.end,
2128                    step: args.step,
2129                    include_end: args.include_end,
2130                    name: Self::generate_series_name_to_str(args.name()),
2131                }
2132            }
2133            Some(protobuf::generate_series_node::Args::TimestampArgs(args)) => {
2134                let step_proto = args.step.as_ref().ok_or_else(|| {
2135                    internal_datafusion_err!("Missing step in TimestampArgs")
2136                })?;
2137                let step = IntervalMonthDayNanoType::make_value(
2138                    step_proto.months,
2139                    step_proto.days,
2140                    step_proto.nanos,
2141                );
2142                GenSeriesArgs::TimestampArgs {
2143                    start: args.start,
2144                    end: args.end,
2145                    step,
2146                    tz: args.tz.as_ref().map(|s| Arc::from(s.as_str())),
2147                    include_end: args.include_end,
2148                    name: Self::generate_series_name_to_str(args.name()),
2149                }
2150            }
2151            Some(protobuf::generate_series_node::Args::DateArgs(args)) => {
2152                let step_proto = args.step.as_ref().ok_or_else(|| {
2153                    internal_datafusion_err!("Missing step in DateArgs")
2154                })?;
2155                let step = IntervalMonthDayNanoType::make_value(
2156                    step_proto.months,
2157                    step_proto.days,
2158                    step_proto.nanos,
2159                );
2160                GenSeriesArgs::DateArgs {
2161                    start: args.start,
2162                    end: args.end,
2163                    step,
2164                    include_end: args.include_end,
2165                    name: Self::generate_series_name_to_str(args.name()),
2166                }
2167            }
2168            None => return internal_err!("Missing args in GenerateSeriesNode"),
2169        };
2170
2171        let table = GenerateSeriesTable::new(Arc::clone(&schema), args);
2172        let generator = table.as_generator(generate_series.target_batch_size as usize)?;
2173
2174        Ok(Arc::new(LazyMemoryExec::try_new(schema, vec![generator])?))
2175    }
2176
2177    fn try_into_cooperative_physical_plan(
2178        &self,
2179        field_stream: &protobuf::CooperativeExecNode,
2180        ctx: &TaskContext,
2181
2182        codec: &dyn PhysicalExtensionCodec,
2183        proto_converter: &dyn PhysicalProtoConverterExtension,
2184    ) -> Result<Arc<dyn ExecutionPlan>> {
2185        let input = into_physical_plan(&field_stream.input, ctx, codec, proto_converter)?;
2186        Ok(Arc::new(CooperativeExec::new(input)))
2187    }
2188
2189    fn try_into_async_func_physical_plan(
2190        &self,
2191        async_func: &protobuf::AsyncFuncExecNode,
2192        ctx: &TaskContext,
2193        codec: &dyn PhysicalExtensionCodec,
2194        proto_converter: &dyn PhysicalProtoConverterExtension,
2195    ) -> Result<Arc<dyn ExecutionPlan>> {
2196        let input: Arc<dyn ExecutionPlan> =
2197            into_physical_plan(&async_func.input, ctx, codec, proto_converter)?;
2198
2199        if async_func.async_exprs.len() != async_func.async_expr_names.len() {
2200            return internal_err!(
2201                "AsyncFuncExecNode async_exprs length does not match async_expr_names"
2202            );
2203        }
2204
2205        let async_exprs = async_func
2206            .async_exprs
2207            .iter()
2208            .zip(async_func.async_expr_names.iter())
2209            .map(|(expr, name)| {
2210                let physical_expr = proto_converter.proto_to_physical_expr(
2211                    expr,
2212                    ctx,
2213                    input.schema().as_ref(),
2214                    codec,
2215                )?;
2216
2217                Ok(Arc::new(AsyncFuncExpr::try_new(
2218                    name.clone(),
2219                    physical_expr,
2220                    input.schema().as_ref(),
2221                )?))
2222            })
2223            .collect::<Result<Vec<_>>>()?;
2224
2225        Ok(Arc::new(AsyncFuncExec::try_new(async_exprs, input)?))
2226    }
2227
2228    fn try_into_buffer_physical_plan(
2229        &self,
2230        buffer: &protobuf::BufferExecNode,
2231        ctx: &TaskContext,
2232        extension_codec: &dyn PhysicalExtensionCodec,
2233        proto_converter: &dyn PhysicalProtoConverterExtension,
2234    ) -> Result<Arc<dyn ExecutionPlan>> {
2235        let input: Arc<dyn ExecutionPlan> =
2236            into_physical_plan(&buffer.input, ctx, extension_codec, proto_converter)?;
2237
2238        Ok(Arc::new(BufferExec::new(input, buffer.capacity as usize)))
2239    }
2240
2241    fn try_from_explain_exec(
2242        exec: &ExplainExec,
2243        _codec: &dyn PhysicalExtensionCodec,
2244    ) -> Result<Self> {
2245        Ok(protobuf::PhysicalPlanNode {
2246            physical_plan_type: Some(PhysicalPlanType::Explain(
2247                protobuf::ExplainExecNode {
2248                    schema: Some(exec.schema().as_ref().try_into()?),
2249                    stringified_plans: exec
2250                        .stringified_plans()
2251                        .iter()
2252                        .map(|plan| plan.into())
2253                        .collect(),
2254                    verbose: exec.verbose(),
2255                },
2256            )),
2257        })
2258    }
2259
2260    fn try_from_projection_exec(
2261        exec: &ProjectionExec,
2262        codec: &dyn PhysicalExtensionCodec,
2263        proto_converter: &dyn PhysicalProtoConverterExtension,
2264    ) -> Result<Self> {
2265        let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2266            exec.input().to_owned(),
2267            codec,
2268            proto_converter,
2269        )?;
2270        let expr = exec
2271            .expr()
2272            .iter()
2273            .map(|proj_expr| {
2274                proto_converter.physical_expr_to_proto(&proj_expr.expr, codec)
2275            })
2276            .collect::<Result<Vec<_>>>()?;
2277        let expr_name = exec
2278            .expr()
2279            .iter()
2280            .map(|proj_expr| proj_expr.alias.clone())
2281            .collect();
2282        Ok(protobuf::PhysicalPlanNode {
2283            physical_plan_type: Some(PhysicalPlanType::Projection(Box::new(
2284                protobuf::ProjectionExecNode {
2285                    input: Some(Box::new(input)),
2286                    expr,
2287                    expr_name,
2288                },
2289            ))),
2290        })
2291    }
2292
2293    fn try_from_analyze_exec(
2294        exec: &AnalyzeExec,
2295        codec: &dyn PhysicalExtensionCodec,
2296        proto_converter: &dyn PhysicalProtoConverterExtension,
2297    ) -> Result<Self> {
2298        let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2299            exec.input().to_owned(),
2300            codec,
2301            proto_converter,
2302        )?;
2303        Ok(protobuf::PhysicalPlanNode {
2304            physical_plan_type: Some(PhysicalPlanType::Analyze(Box::new(
2305                protobuf::AnalyzeExecNode {
2306                    verbose: exec.verbose(),
2307                    show_statistics: exec.show_statistics(),
2308                    input: Some(Box::new(input)),
2309                    schema: Some(exec.schema().as_ref().try_into()?),
2310                },
2311            ))),
2312        })
2313    }
2314
2315    fn try_from_filter_exec(
2316        exec: &FilterExec,
2317        codec: &dyn PhysicalExtensionCodec,
2318        proto_converter: &dyn PhysicalProtoConverterExtension,
2319    ) -> Result<Self> {
2320        let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2321            exec.input().to_owned(),
2322            codec,
2323            proto_converter,
2324        )?;
2325        Ok(protobuf::PhysicalPlanNode {
2326            physical_plan_type: Some(PhysicalPlanType::Filter(Box::new(
2327                protobuf::FilterExecNode {
2328                    input: Some(Box::new(input)),
2329                    expr: Some(
2330                        proto_converter
2331                            .physical_expr_to_proto(exec.predicate(), codec)?,
2332                    ),
2333                    default_filter_selectivity: exec.default_selectivity() as u32,
2334                    projection: exec.projection().as_ref().map_or_else(Vec::new, |v| {
2335                        v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2336                    }),
2337                    batch_size: exec.batch_size() as u32,
2338                    fetch: exec.fetch().map(|f| f as u32),
2339                },
2340            ))),
2341        })
2342    }
2343
2344    fn try_from_global_limit_exec(
2345        limit: &GlobalLimitExec,
2346        codec: &dyn PhysicalExtensionCodec,
2347        proto_converter: &dyn PhysicalProtoConverterExtension,
2348    ) -> Result<Self> {
2349        let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2350            limit.input().to_owned(),
2351            codec,
2352            proto_converter,
2353        )?;
2354
2355        Ok(protobuf::PhysicalPlanNode {
2356            physical_plan_type: Some(PhysicalPlanType::GlobalLimit(Box::new(
2357                protobuf::GlobalLimitExecNode {
2358                    input: Some(Box::new(input)),
2359                    skip: limit.skip() as u32,
2360                    fetch: match limit.fetch() {
2361                        Some(n) => n as i64,
2362                        _ => -1, // no limit
2363                    },
2364                },
2365            ))),
2366        })
2367    }
2368
2369    fn try_from_local_limit_exec(
2370        limit: &LocalLimitExec,
2371        codec: &dyn PhysicalExtensionCodec,
2372        proto_converter: &dyn PhysicalProtoConverterExtension,
2373    ) -> Result<Self> {
2374        let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2375            limit.input().to_owned(),
2376            codec,
2377            proto_converter,
2378        )?;
2379        Ok(protobuf::PhysicalPlanNode {
2380            physical_plan_type: Some(PhysicalPlanType::LocalLimit(Box::new(
2381                protobuf::LocalLimitExecNode {
2382                    input: Some(Box::new(input)),
2383                    fetch: limit.fetch() as u32,
2384                },
2385            ))),
2386        })
2387    }
2388
2389    fn try_from_hash_join_exec(
2390        exec: &HashJoinExec,
2391        codec: &dyn PhysicalExtensionCodec,
2392        proto_converter: &dyn PhysicalProtoConverterExtension,
2393    ) -> Result<Self> {
2394        let left = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2395            exec.left().to_owned(),
2396            codec,
2397            proto_converter,
2398        )?;
2399        let right = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2400            exec.right().to_owned(),
2401            codec,
2402            proto_converter,
2403        )?;
2404        let on: Vec<protobuf::JoinOn> = exec
2405            .on()
2406            .iter()
2407            .map(|tuple| {
2408                let l = proto_converter.physical_expr_to_proto(&tuple.0, codec)?;
2409                let r = proto_converter.physical_expr_to_proto(&tuple.1, codec)?;
2410                Ok::<_, DataFusionError>(protobuf::JoinOn {
2411                    left: Some(l),
2412                    right: Some(r),
2413                })
2414            })
2415            .collect::<Result<_>>()?;
2416        let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2417        let null_equality: protobuf::NullEquality = exec.null_equality().into();
2418        let filter = exec
2419            .filter()
2420            .as_ref()
2421            .map(|f| {
2422                let expression =
2423                    proto_converter.physical_expr_to_proto(f.expression(), codec)?;
2424                let column_indices = f
2425                    .column_indices()
2426                    .iter()
2427                    .map(|i| {
2428                        let side: protobuf::JoinSide = i.side.to_owned().into();
2429                        protobuf::ColumnIndex {
2430                            index: i.index as u32,
2431                            side: side.into(),
2432                        }
2433                    })
2434                    .collect();
2435                let schema = f.schema().as_ref().try_into()?;
2436                Ok(protobuf::JoinFilter {
2437                    expression: Some(expression),
2438                    column_indices,
2439                    schema: Some(schema),
2440                })
2441            })
2442            .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2443
2444        let partition_mode = match exec.partition_mode() {
2445            PartitionMode::CollectLeft => protobuf::PartitionMode::CollectLeft,
2446            PartitionMode::Partitioned => protobuf::PartitionMode::Partitioned,
2447            PartitionMode::Auto => protobuf::PartitionMode::Auto,
2448        };
2449
2450        Ok(protobuf::PhysicalPlanNode {
2451            physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new(
2452                protobuf::HashJoinExecNode {
2453                    left: Some(Box::new(left)),
2454                    right: Some(Box::new(right)),
2455                    on,
2456                    join_type: join_type.into(),
2457                    partition_mode: partition_mode.into(),
2458                    null_equality: null_equality.into(),
2459                    filter,
2460                    projection: exec.projection.as_ref().map_or_else(Vec::new, |v| {
2461                        v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2462                    }),
2463                    null_aware: exec.null_aware,
2464                },
2465            ))),
2466        })
2467    }
2468
2469    fn try_from_symmetric_hash_join_exec(
2470        exec: &SymmetricHashJoinExec,
2471        codec: &dyn PhysicalExtensionCodec,
2472        proto_converter: &dyn PhysicalProtoConverterExtension,
2473    ) -> Result<Self> {
2474        let left = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2475            exec.left().to_owned(),
2476            codec,
2477            proto_converter,
2478        )?;
2479        let right = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2480            exec.right().to_owned(),
2481            codec,
2482            proto_converter,
2483        )?;
2484        let on = exec
2485            .on()
2486            .iter()
2487            .map(|tuple| {
2488                let l = proto_converter.physical_expr_to_proto(&tuple.0, codec)?;
2489                let r = proto_converter.physical_expr_to_proto(&tuple.1, codec)?;
2490                Ok::<_, DataFusionError>(protobuf::JoinOn {
2491                    left: Some(l),
2492                    right: Some(r),
2493                })
2494            })
2495            .collect::<Result<_>>()?;
2496        let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2497        let null_equality: protobuf::NullEquality = exec.null_equality().into();
2498        let filter = exec
2499            .filter()
2500            .as_ref()
2501            .map(|f| {
2502                let expression =
2503                    proto_converter.physical_expr_to_proto(f.expression(), codec)?;
2504                let column_indices = f
2505                    .column_indices()
2506                    .iter()
2507                    .map(|i| {
2508                        let side: protobuf::JoinSide = i.side.to_owned().into();
2509                        protobuf::ColumnIndex {
2510                            index: i.index as u32,
2511                            side: side.into(),
2512                        }
2513                    })
2514                    .collect();
2515                let schema = f.schema().as_ref().try_into()?;
2516                Ok(protobuf::JoinFilter {
2517                    expression: Some(expression),
2518                    column_indices,
2519                    schema: Some(schema),
2520                })
2521            })
2522            .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2523
2524        let partition_mode = match exec.partition_mode() {
2525            StreamJoinPartitionMode::SinglePartition => {
2526                protobuf::StreamPartitionMode::SinglePartition
2527            }
2528            StreamJoinPartitionMode::Partitioned => {
2529                protobuf::StreamPartitionMode::PartitionedExec
2530            }
2531        };
2532
2533        let left_sort_exprs = exec
2534            .left_sort_exprs()
2535            .map(|exprs| {
2536                exprs
2537                    .iter()
2538                    .map(|expr| {
2539                        Ok(protobuf::PhysicalSortExprNode {
2540                            expr: Some(Box::new(
2541                                proto_converter
2542                                    .physical_expr_to_proto(&expr.expr, codec)?,
2543                            )),
2544                            asc: !expr.options.descending,
2545                            nulls_first: expr.options.nulls_first,
2546                        })
2547                    })
2548                    .collect::<Result<Vec<_>>>()
2549            })
2550            .transpose()?
2551            .unwrap_or(vec![]);
2552
2553        let right_sort_exprs = exec
2554            .right_sort_exprs()
2555            .map(|exprs| {
2556                exprs
2557                    .iter()
2558                    .map(|expr| {
2559                        Ok(protobuf::PhysicalSortExprNode {
2560                            expr: Some(Box::new(
2561                                proto_converter
2562                                    .physical_expr_to_proto(&expr.expr, codec)?,
2563                            )),
2564                            asc: !expr.options.descending,
2565                            nulls_first: expr.options.nulls_first,
2566                        })
2567                    })
2568                    .collect::<Result<Vec<_>>>()
2569            })
2570            .transpose()?
2571            .unwrap_or(vec![]);
2572
2573        Ok(protobuf::PhysicalPlanNode {
2574            physical_plan_type: Some(PhysicalPlanType::SymmetricHashJoin(Box::new(
2575                protobuf::SymmetricHashJoinExecNode {
2576                    left: Some(Box::new(left)),
2577                    right: Some(Box::new(right)),
2578                    on,
2579                    join_type: join_type.into(),
2580                    partition_mode: partition_mode.into(),
2581                    null_equality: null_equality.into(),
2582                    left_sort_exprs,
2583                    right_sort_exprs,
2584                    filter,
2585                },
2586            ))),
2587        })
2588    }
2589
2590    fn try_from_sort_merge_join_exec(
2591        exec: &SortMergeJoinExec,
2592        codec: &dyn PhysicalExtensionCodec,
2593        proto_converter: &dyn PhysicalProtoConverterExtension,
2594    ) -> Result<Self> {
2595        let left = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2596            exec.left().to_owned(),
2597            codec,
2598            proto_converter,
2599        )?;
2600        let right = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2601            exec.right().to_owned(),
2602            codec,
2603            proto_converter,
2604        )?;
2605        let on = exec
2606            .on()
2607            .iter()
2608            .map(|tuple| {
2609                let l = proto_converter.physical_expr_to_proto(&tuple.0, codec)?;
2610                let r = proto_converter.physical_expr_to_proto(&tuple.1, codec)?;
2611                Ok::<_, DataFusionError>(protobuf::JoinOn {
2612                    left: Some(l),
2613                    right: Some(r),
2614                })
2615            })
2616            .collect::<Result<_>>()?;
2617        let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2618        let null_equality: protobuf::NullEquality = exec.null_equality().into();
2619        let filter = exec
2620            .filter()
2621            .as_ref()
2622            .map(|f| {
2623                let expression =
2624                    proto_converter.physical_expr_to_proto(f.expression(), codec)?;
2625                let column_indices = f
2626                    .column_indices()
2627                    .iter()
2628                    .map(|i| {
2629                        let side: protobuf::JoinSide = i.side.to_owned().into();
2630                        protobuf::ColumnIndex {
2631                            index: i.index as u32,
2632                            side: side.into(),
2633                        }
2634                    })
2635                    .collect();
2636                let schema = f.schema().as_ref().try_into()?;
2637                Ok(protobuf::JoinFilter {
2638                    expression: Some(expression),
2639                    column_indices,
2640                    schema: Some(schema),
2641                })
2642            })
2643            .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2644
2645        let sort_options = exec
2646            .sort_options()
2647            .iter()
2648            .map(
2649                |SortOptions {
2650                     descending,
2651                     nulls_first,
2652                 }| {
2653                    SortExprNode {
2654                        expr: None,
2655                        asc: !*descending,
2656                        nulls_first: *nulls_first,
2657                    }
2658                },
2659            )
2660            .collect();
2661
2662        Ok(protobuf::PhysicalPlanNode {
2663            physical_plan_type: Some(PhysicalPlanType::SortMergeJoin(Box::new(
2664                SortMergeJoinExecNode {
2665                    left: Some(Box::new(left)),
2666                    right: Some(Box::new(right)),
2667                    on,
2668                    join_type: join_type.into(),
2669                    null_equality: null_equality.into(),
2670                    filter,
2671                    sort_options,
2672                },
2673            ))),
2674        })
2675    }
2676
2677    fn try_from_cross_join_exec(
2678        exec: &CrossJoinExec,
2679        codec: &dyn PhysicalExtensionCodec,
2680        proto_converter: &dyn PhysicalProtoConverterExtension,
2681    ) -> Result<Self> {
2682        let left = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2683            exec.left().to_owned(),
2684            codec,
2685            proto_converter,
2686        )?;
2687        let right = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2688            exec.right().to_owned(),
2689            codec,
2690            proto_converter,
2691        )?;
2692        Ok(protobuf::PhysicalPlanNode {
2693            physical_plan_type: Some(PhysicalPlanType::CrossJoin(Box::new(
2694                protobuf::CrossJoinExecNode {
2695                    left: Some(Box::new(left)),
2696                    right: Some(Box::new(right)),
2697                },
2698            ))),
2699        })
2700    }
2701
2702    fn try_from_aggregate_exec(
2703        exec: &AggregateExec,
2704        codec: &dyn PhysicalExtensionCodec,
2705        proto_converter: &dyn PhysicalProtoConverterExtension,
2706    ) -> Result<Self> {
2707        let groups: Vec<bool> = exec
2708            .group_expr()
2709            .groups()
2710            .iter()
2711            .flatten()
2712            .copied()
2713            .collect();
2714
2715        let group_names = exec
2716            .group_expr()
2717            .expr()
2718            .iter()
2719            .map(|expr| expr.1.to_owned())
2720            .collect();
2721
2722        let filter = exec
2723            .filter_expr()
2724            .iter()
2725            .map(|expr| serialize_maybe_filter(expr.to_owned(), codec, proto_converter))
2726            .collect::<Result<Vec<_>>>()?;
2727
2728        let agg = exec
2729            .aggr_expr()
2730            .iter()
2731            .map(|expr| {
2732                serialize_physical_aggr_expr(expr.to_owned(), codec, proto_converter)
2733            })
2734            .collect::<Result<Vec<_>>>()?;
2735
2736        let agg_names = exec
2737            .aggr_expr()
2738            .iter()
2739            .map(|expr| expr.name().to_string())
2740            .collect::<Vec<_>>();
2741
2742        let agg_mode = match exec.mode() {
2743            AggregateMode::Partial => protobuf::AggregateMode::Partial,
2744            AggregateMode::Final => protobuf::AggregateMode::Final,
2745            AggregateMode::FinalPartitioned => protobuf::AggregateMode::FinalPartitioned,
2746            AggregateMode::Single => protobuf::AggregateMode::Single,
2747            AggregateMode::SinglePartitioned => {
2748                protobuf::AggregateMode::SinglePartitioned
2749            }
2750            AggregateMode::PartialReduce => protobuf::AggregateMode::PartialReduce,
2751        };
2752        let input_schema = exec.input_schema();
2753        let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2754            exec.input().to_owned(),
2755            codec,
2756            proto_converter,
2757        )?;
2758
2759        let null_expr = exec
2760            .group_expr()
2761            .null_expr()
2762            .iter()
2763            .map(|expr| proto_converter.physical_expr_to_proto(&expr.0, codec))
2764            .collect::<Result<Vec<_>>>()?;
2765
2766        let group_expr = exec
2767            .group_expr()
2768            .expr()
2769            .iter()
2770            .map(|expr| proto_converter.physical_expr_to_proto(&expr.0, codec))
2771            .collect::<Result<Vec<_>>>()?;
2772
2773        let limit = exec.limit_options().map(|config| protobuf::AggLimit {
2774            limit: config.limit() as u64,
2775            descending: config.descending(),
2776        });
2777
2778        Ok(protobuf::PhysicalPlanNode {
2779            physical_plan_type: Some(PhysicalPlanType::Aggregate(Box::new(
2780                protobuf::AggregateExecNode {
2781                    group_expr,
2782                    group_expr_name: group_names,
2783                    aggr_expr: agg,
2784                    filter_expr: filter,
2785                    aggr_expr_name: agg_names,
2786                    mode: agg_mode as i32,
2787                    input: Some(Box::new(input)),
2788                    input_schema: Some(input_schema.as_ref().try_into()?),
2789                    null_expr,
2790                    groups,
2791                    limit,
2792                    has_grouping_set: exec.group_expr().has_grouping_set(),
2793                },
2794            ))),
2795        })
2796    }
2797
2798    fn try_from_empty_exec(
2799        empty: &EmptyExec,
2800        _codec: &dyn PhysicalExtensionCodec,
2801    ) -> Result<Self> {
2802        let schema = empty.schema().as_ref().try_into()?;
2803        Ok(protobuf::PhysicalPlanNode {
2804            physical_plan_type: Some(PhysicalPlanType::Empty(protobuf::EmptyExecNode {
2805                schema: Some(schema),
2806            })),
2807        })
2808    }
2809
2810    fn try_from_placeholder_row_exec(
2811        empty: &PlaceholderRowExec,
2812        _codec: &dyn PhysicalExtensionCodec,
2813    ) -> Result<Self> {
2814        let schema = empty.schema().as_ref().try_into()?;
2815        Ok(protobuf::PhysicalPlanNode {
2816            physical_plan_type: Some(PhysicalPlanType::PlaceholderRow(
2817                protobuf::PlaceholderRowExecNode {
2818                    schema: Some(schema),
2819                },
2820            )),
2821        })
2822    }
2823
2824    #[expect(deprecated)]
2825    fn try_from_coalesce_batches_exec(
2826        coalesce_batches: &CoalesceBatchesExec,
2827        codec: &dyn PhysicalExtensionCodec,
2828        proto_converter: &dyn PhysicalProtoConverterExtension,
2829    ) -> Result<Self> {
2830        let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2831            coalesce_batches.input().to_owned(),
2832            codec,
2833            proto_converter,
2834        )?;
2835        Ok(protobuf::PhysicalPlanNode {
2836            physical_plan_type: Some(PhysicalPlanType::CoalesceBatches(Box::new(
2837                protobuf::CoalesceBatchesExecNode {
2838                    input: Some(Box::new(input)),
2839                    target_batch_size: coalesce_batches.target_batch_size() as u32,
2840                    fetch: coalesce_batches.fetch().map(|n| n as u32),
2841                },
2842            ))),
2843        })
2844    }
2845
2846    fn try_from_data_source_exec(
2847        data_source_exec: &DataSourceExec,
2848        codec: &dyn PhysicalExtensionCodec,
2849        proto_converter: &dyn PhysicalProtoConverterExtension,
2850    ) -> Result<Option<Self>> {
2851        let data_source = data_source_exec.data_source();
2852        if let Some(maybe_csv) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2853            let source = maybe_csv.file_source();
2854            if let Some(csv_config) = source.as_any().downcast_ref::<CsvSource>() {
2855                return Ok(Some(protobuf::PhysicalPlanNode {
2856                    physical_plan_type: Some(PhysicalPlanType::CsvScan(
2857                        protobuf::CsvScanExecNode {
2858                            base_conf: Some(serialize_file_scan_config(
2859                                maybe_csv,
2860                                codec,
2861                                proto_converter,
2862                            )?),
2863                            has_header: csv_config.has_header(),
2864                            delimiter: byte_to_string(
2865                                csv_config.delimiter(),
2866                                "delimiter",
2867                            )?,
2868                            quote: byte_to_string(csv_config.quote(), "quote")?,
2869                            optional_escape: if let Some(escape) = csv_config.escape() {
2870                                Some(
2871                                    protobuf::csv_scan_exec_node::OptionalEscape::Escape(
2872                                        byte_to_string(escape, "escape")?,
2873                                    ),
2874                                )
2875                            } else {
2876                                None
2877                            },
2878                            optional_comment: if let Some(comment) = csv_config.comment()
2879                            {
2880                                Some(protobuf::csv_scan_exec_node::OptionalComment::Comment(
2881                                        byte_to_string(comment, "comment")?,
2882                                    ))
2883                            } else {
2884                                None
2885                            },
2886                            newlines_in_values: csv_config.newlines_in_values(),
2887                            truncate_rows: csv_config.truncate_rows(),
2888                        },
2889                    )),
2890                }));
2891            }
2892        }
2893
2894        if let Some(scan_conf) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2895            let source = scan_conf.file_source();
2896            if let Some(_json_source) = source.as_any().downcast_ref::<JsonSource>() {
2897                return Ok(Some(protobuf::PhysicalPlanNode {
2898                    physical_plan_type: Some(PhysicalPlanType::JsonScan(
2899                        protobuf::JsonScanExecNode {
2900                            base_conf: Some(serialize_file_scan_config(
2901                                scan_conf,
2902                                codec,
2903                                proto_converter,
2904                            )?),
2905                        },
2906                    )),
2907                }));
2908            }
2909        }
2910
2911        if let Some(scan_conf) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2912            let source = scan_conf.file_source();
2913            if let Some(_arrow_source) = source.as_any().downcast_ref::<ArrowSource>() {
2914                return Ok(Some(protobuf::PhysicalPlanNode {
2915                    physical_plan_type: Some(PhysicalPlanType::ArrowScan(
2916                        protobuf::ArrowScanExecNode {
2917                            base_conf: Some(serialize_file_scan_config(
2918                                scan_conf,
2919                                codec,
2920                                proto_converter,
2921                            )?),
2922                        },
2923                    )),
2924                }));
2925            }
2926        }
2927
2928        #[cfg(feature = "parquet")]
2929        if let Some((maybe_parquet, conf)) =
2930            data_source_exec.downcast_to_file_source::<ParquetSource>()
2931        {
2932            let predicate = conf
2933                .filter()
2934                .map(|pred| proto_converter.physical_expr_to_proto(&pred, codec))
2935                .transpose()?;
2936            return Ok(Some(protobuf::PhysicalPlanNode {
2937                physical_plan_type: Some(PhysicalPlanType::ParquetScan(
2938                    protobuf::ParquetScanExecNode {
2939                        base_conf: Some(serialize_file_scan_config(
2940                            maybe_parquet,
2941                            codec,
2942                            proto_converter,
2943                        )?),
2944                        predicate,
2945                        parquet_options: Some(conf.table_parquet_options().try_into()?),
2946                    },
2947                )),
2948            }));
2949        }
2950
2951        #[cfg(feature = "avro")]
2952        if let Some(maybe_avro) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2953            let source = maybe_avro.file_source();
2954            if source.as_any().downcast_ref::<AvroSource>().is_some() {
2955                return Ok(Some(protobuf::PhysicalPlanNode {
2956                    physical_plan_type: Some(PhysicalPlanType::AvroScan(
2957                        protobuf::AvroScanExecNode {
2958                            base_conf: Some(serialize_file_scan_config(
2959                                maybe_avro,
2960                                codec,
2961                                proto_converter,
2962                            )?),
2963                        },
2964                    )),
2965                }));
2966            }
2967        }
2968
2969        if let Some(source_conf) =
2970            data_source.as_any().downcast_ref::<MemorySourceConfig>()
2971        {
2972            let proto_partitions = source_conf
2973                .partitions()
2974                .iter()
2975                .map(|p| serialize_record_batches(p))
2976                .collect::<Result<Vec<_>>>()?;
2977
2978            let proto_schema: protobuf::Schema =
2979                source_conf.original_schema().as_ref().try_into()?;
2980
2981            let proto_projection = source_conf
2982                .projection()
2983                .as_ref()
2984                .map_or_else(Vec::new, |v| {
2985                    v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2986                });
2987
2988            let proto_sort_information = source_conf
2989                .sort_information()
2990                .iter()
2991                .map(|ordering| {
2992                    let sort_exprs = serialize_physical_sort_exprs(
2993                        ordering.to_owned(),
2994                        codec,
2995                        proto_converter,
2996                    )?;
2997                    Ok::<_, DataFusionError>(protobuf::PhysicalSortExprNodeCollection {
2998                        physical_sort_expr_nodes: sort_exprs,
2999                    })
3000                })
3001                .collect::<Result<Vec<_>, _>>()?;
3002
3003            return Ok(Some(protobuf::PhysicalPlanNode {
3004                physical_plan_type: Some(PhysicalPlanType::MemoryScan(
3005                    protobuf::MemoryScanExecNode {
3006                        partitions: proto_partitions,
3007                        schema: Some(proto_schema),
3008                        projection: proto_projection,
3009                        sort_information: proto_sort_information,
3010                        show_sizes: source_conf.show_sizes(),
3011                        fetch: source_conf.fetch().map(|f| f as u32),
3012                    },
3013                )),
3014            }));
3015        }
3016
3017        Ok(None)
3018    }
3019
3020    fn try_from_coalesce_partitions_exec(
3021        exec: &CoalescePartitionsExec,
3022        codec: &dyn PhysicalExtensionCodec,
3023        proto_converter: &dyn PhysicalProtoConverterExtension,
3024    ) -> Result<Self> {
3025        let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3026            exec.input().to_owned(),
3027            codec,
3028            proto_converter,
3029        )?;
3030        Ok(protobuf::PhysicalPlanNode {
3031            physical_plan_type: Some(PhysicalPlanType::Merge(Box::new(
3032                protobuf::CoalescePartitionsExecNode {
3033                    input: Some(Box::new(input)),
3034                    fetch: exec.fetch().map(|f| f as u32),
3035                },
3036            ))),
3037        })
3038    }
3039
3040    fn try_from_repartition_exec(
3041        exec: &RepartitionExec,
3042        codec: &dyn PhysicalExtensionCodec,
3043        proto_converter: &dyn PhysicalProtoConverterExtension,
3044    ) -> Result<Self> {
3045        let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3046            exec.input().to_owned(),
3047            codec,
3048            proto_converter,
3049        )?;
3050
3051        let pb_partitioning =
3052            serialize_partitioning(exec.partitioning(), codec, proto_converter)?;
3053
3054        Ok(protobuf::PhysicalPlanNode {
3055            physical_plan_type: Some(PhysicalPlanType::Repartition(Box::new(
3056                protobuf::RepartitionExecNode {
3057                    input: Some(Box::new(input)),
3058                    partitioning: Some(pb_partitioning),
3059                },
3060            ))),
3061        })
3062    }
3063
3064    fn try_from_sort_exec(
3065        exec: &SortExec,
3066        codec: &dyn PhysicalExtensionCodec,
3067        proto_converter: &dyn PhysicalProtoConverterExtension,
3068    ) -> Result<Self> {
3069        let input = proto_converter.execution_plan_to_proto(exec.input(), codec)?;
3070        let expr = exec
3071            .expr()
3072            .iter()
3073            .map(|expr| {
3074                let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
3075                    expr: Some(Box::new(
3076                        proto_converter.physical_expr_to_proto(&expr.expr, codec)?,
3077                    )),
3078                    asc: !expr.options.descending,
3079                    nulls_first: expr.options.nulls_first,
3080                });
3081                Ok(protobuf::PhysicalExprNode {
3082                    expr_id: None,
3083                    expr_type: Some(ExprType::Sort(sort_expr)),
3084                })
3085            })
3086            .collect::<Result<Vec<_>>>()?;
3087        Ok(protobuf::PhysicalPlanNode {
3088            physical_plan_type: Some(PhysicalPlanType::Sort(Box::new(
3089                protobuf::SortExecNode {
3090                    input: Some(Box::new(input)),
3091                    expr,
3092                    fetch: match exec.fetch() {
3093                        Some(n) => n as i64,
3094                        _ => -1,
3095                    },
3096                    preserve_partitioning: exec.preserve_partitioning(),
3097                },
3098            ))),
3099        })
3100    }
3101
3102    fn try_from_union_exec(
3103        union: &UnionExec,
3104        codec: &dyn PhysicalExtensionCodec,
3105        proto_converter: &dyn PhysicalProtoConverterExtension,
3106    ) -> Result<Self> {
3107        let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
3108        for input in union.inputs() {
3109            inputs.push(
3110                protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3111                    input.to_owned(),
3112                    codec,
3113                    proto_converter,
3114                )?,
3115            );
3116        }
3117        Ok(protobuf::PhysicalPlanNode {
3118            physical_plan_type: Some(PhysicalPlanType::Union(protobuf::UnionExecNode {
3119                inputs,
3120            })),
3121        })
3122    }
3123
3124    fn try_from_interleave_exec(
3125        interleave: &InterleaveExec,
3126        codec: &dyn PhysicalExtensionCodec,
3127        proto_converter: &dyn PhysicalProtoConverterExtension,
3128    ) -> Result<Self> {
3129        let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
3130        for input in interleave.inputs() {
3131            inputs.push(
3132                protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3133                    input.to_owned(),
3134                    codec,
3135                    proto_converter,
3136                )?,
3137            );
3138        }
3139        Ok(protobuf::PhysicalPlanNode {
3140            physical_plan_type: Some(PhysicalPlanType::Interleave(
3141                protobuf::InterleaveExecNode { inputs },
3142            )),
3143        })
3144    }
3145
3146    fn try_from_sort_preserving_merge_exec(
3147        exec: &SortPreservingMergeExec,
3148        codec: &dyn PhysicalExtensionCodec,
3149        proto_converter: &dyn PhysicalProtoConverterExtension,
3150    ) -> Result<Self> {
3151        let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3152            exec.input().to_owned(),
3153            codec,
3154            proto_converter,
3155        )?;
3156        let expr = exec
3157            .expr()
3158            .iter()
3159            .map(|expr| {
3160                let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
3161                    expr: Some(Box::new(
3162                        proto_converter.physical_expr_to_proto(&expr.expr, codec)?,
3163                    )),
3164                    asc: !expr.options.descending,
3165                    nulls_first: expr.options.nulls_first,
3166                });
3167                Ok(protobuf::PhysicalExprNode {
3168                    expr_id: None,
3169                    expr_type: Some(ExprType::Sort(sort_expr)),
3170                })
3171            })
3172            .collect::<Result<Vec<_>>>()?;
3173        Ok(protobuf::PhysicalPlanNode {
3174            physical_plan_type: Some(PhysicalPlanType::SortPreservingMerge(Box::new(
3175                protobuf::SortPreservingMergeExecNode {
3176                    input: Some(Box::new(input)),
3177                    expr,
3178                    fetch: exec.fetch().map(|f| f as i64).unwrap_or(-1),
3179                },
3180            ))),
3181        })
3182    }
3183
3184    fn try_from_nested_loop_join_exec(
3185        exec: &NestedLoopJoinExec,
3186        codec: &dyn PhysicalExtensionCodec,
3187        proto_converter: &dyn PhysicalProtoConverterExtension,
3188    ) -> Result<Self> {
3189        let left = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3190            exec.left().to_owned(),
3191            codec,
3192            proto_converter,
3193        )?;
3194        let right = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3195            exec.right().to_owned(),
3196            codec,
3197            proto_converter,
3198        )?;
3199
3200        let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
3201        let filter = exec
3202            .filter()
3203            .as_ref()
3204            .map(|f| {
3205                let expression =
3206                    proto_converter.physical_expr_to_proto(f.expression(), codec)?;
3207                let column_indices = f
3208                    .column_indices()
3209                    .iter()
3210                    .map(|i| {
3211                        let side: protobuf::JoinSide = i.side.to_owned().into();
3212                        protobuf::ColumnIndex {
3213                            index: i.index as u32,
3214                            side: side.into(),
3215                        }
3216                    })
3217                    .collect();
3218                let schema = f.schema().as_ref().try_into()?;
3219                Ok(protobuf::JoinFilter {
3220                    expression: Some(expression),
3221                    column_indices,
3222                    schema: Some(schema),
3223                })
3224            })
3225            .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
3226
3227        Ok(protobuf::PhysicalPlanNode {
3228            physical_plan_type: Some(PhysicalPlanType::NestedLoopJoin(Box::new(
3229                protobuf::NestedLoopJoinExecNode {
3230                    left: Some(Box::new(left)),
3231                    right: Some(Box::new(right)),
3232                    join_type: join_type.into(),
3233                    filter,
3234                    projection: exec.projection().as_ref().map_or_else(Vec::new, |v| {
3235                        v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
3236                    }),
3237                },
3238            ))),
3239        })
3240    }
3241
3242    fn try_from_window_agg_exec(
3243        exec: &WindowAggExec,
3244        codec: &dyn PhysicalExtensionCodec,
3245        proto_converter: &dyn PhysicalProtoConverterExtension,
3246    ) -> Result<Self> {
3247        let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3248            exec.input().to_owned(),
3249            codec,
3250            proto_converter,
3251        )?;
3252
3253        let window_expr = exec
3254            .window_expr()
3255            .iter()
3256            .map(|e| serialize_physical_window_expr(e, codec, proto_converter))
3257            .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
3258
3259        let partition_keys = exec
3260            .partition_keys()
3261            .iter()
3262            .map(|e| proto_converter.physical_expr_to_proto(e, codec))
3263            .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
3264
3265        Ok(protobuf::PhysicalPlanNode {
3266            physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
3267                protobuf::WindowAggExecNode {
3268                    input: Some(Box::new(input)),
3269                    window_expr,
3270                    partition_keys,
3271                    input_order_mode: None,
3272                },
3273            ))),
3274        })
3275    }
3276
3277    fn try_from_bounded_window_agg_exec(
3278        exec: &BoundedWindowAggExec,
3279        codec: &dyn PhysicalExtensionCodec,
3280        proto_converter: &dyn PhysicalProtoConverterExtension,
3281    ) -> Result<Self> {
3282        let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3283            exec.input().to_owned(),
3284            codec,
3285            proto_converter,
3286        )?;
3287
3288        let window_expr = exec
3289            .window_expr()
3290            .iter()
3291            .map(|e| serialize_physical_window_expr(e, codec, proto_converter))
3292            .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
3293
3294        let partition_keys = exec
3295            .partition_keys()
3296            .iter()
3297            .map(|e| proto_converter.physical_expr_to_proto(e, codec))
3298            .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
3299
3300        let input_order_mode = match &exec.input_order_mode {
3301            InputOrderMode::Linear => {
3302                window_agg_exec_node::InputOrderMode::Linear(protobuf::EmptyMessage {})
3303            }
3304            InputOrderMode::PartiallySorted(columns) => {
3305                window_agg_exec_node::InputOrderMode::PartiallySorted(
3306                    protobuf::PartiallySortedInputOrderMode {
3307                        columns: columns.iter().map(|c| *c as u64).collect(),
3308                    },
3309                )
3310            }
3311            InputOrderMode::Sorted => {
3312                window_agg_exec_node::InputOrderMode::Sorted(protobuf::EmptyMessage {})
3313            }
3314        };
3315
3316        Ok(protobuf::PhysicalPlanNode {
3317            physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
3318                protobuf::WindowAggExecNode {
3319                    input: Some(Box::new(input)),
3320                    window_expr,
3321                    partition_keys,
3322                    input_order_mode: Some(input_order_mode),
3323                },
3324            ))),
3325        })
3326    }
3327
3328    fn try_from_data_sink_exec(
3329        exec: &DataSinkExec,
3330        codec: &dyn PhysicalExtensionCodec,
3331        proto_converter: &dyn PhysicalProtoConverterExtension,
3332    ) -> Result<Option<Self>> {
3333        let input: protobuf::PhysicalPlanNode =
3334            protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3335                exec.input().to_owned(),
3336                codec,
3337                proto_converter,
3338            )?;
3339        let sort_order = match exec.sort_order() {
3340            Some(requirements) => {
3341                let expr = requirements
3342                    .iter()
3343                    .map(|requirement| {
3344                        let expr: PhysicalSortExpr = requirement.to_owned().into();
3345                        let sort_expr = protobuf::PhysicalSortExprNode {
3346                            expr: Some(Box::new(
3347                                proto_converter
3348                                    .physical_expr_to_proto(&expr.expr, codec)?,
3349                            )),
3350                            asc: !expr.options.descending,
3351                            nulls_first: expr.options.nulls_first,
3352                        };
3353                        Ok(sort_expr)
3354                    })
3355                    .collect::<Result<Vec<_>>>()?;
3356                Some(protobuf::PhysicalSortExprNodeCollection {
3357                    physical_sort_expr_nodes: expr,
3358                })
3359            }
3360            None => None,
3361        };
3362
3363        if let Some(sink) = exec.sink().as_any().downcast_ref::<JsonSink>() {
3364            return Ok(Some(protobuf::PhysicalPlanNode {
3365                physical_plan_type: Some(PhysicalPlanType::JsonSink(Box::new(
3366                    protobuf::JsonSinkExecNode {
3367                        input: Some(Box::new(input)),
3368                        sink: Some(sink.try_into()?),
3369                        sink_schema: Some(exec.schema().as_ref().try_into()?),
3370                        sort_order,
3371                    },
3372                ))),
3373            }));
3374        }
3375
3376        if let Some(sink) = exec.sink().as_any().downcast_ref::<CsvSink>() {
3377            return Ok(Some(protobuf::PhysicalPlanNode {
3378                physical_plan_type: Some(PhysicalPlanType::CsvSink(Box::new(
3379                    protobuf::CsvSinkExecNode {
3380                        input: Some(Box::new(input)),
3381                        sink: Some(sink.try_into()?),
3382                        sink_schema: Some(exec.schema().as_ref().try_into()?),
3383                        sort_order,
3384                    },
3385                ))),
3386            }));
3387        }
3388
3389        #[cfg(feature = "parquet")]
3390        if let Some(sink) = exec.sink().as_any().downcast_ref::<ParquetSink>() {
3391            return Ok(Some(protobuf::PhysicalPlanNode {
3392                physical_plan_type: Some(PhysicalPlanType::ParquetSink(Box::new(
3393                    protobuf::ParquetSinkExecNode {
3394                        input: Some(Box::new(input)),
3395                        sink: Some(sink.try_into()?),
3396                        sink_schema: Some(exec.schema().as_ref().try_into()?),
3397                        sort_order,
3398                    },
3399                ))),
3400            }));
3401        }
3402
3403        // If unknown DataSink then let extension handle it
3404        Ok(None)
3405    }
3406
3407    fn try_from_unnest_exec(
3408        exec: &UnnestExec,
3409        codec: &dyn PhysicalExtensionCodec,
3410        proto_converter: &dyn PhysicalProtoConverterExtension,
3411    ) -> Result<Self> {
3412        let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3413            exec.input().to_owned(),
3414            codec,
3415            proto_converter,
3416        )?;
3417
3418        Ok(protobuf::PhysicalPlanNode {
3419            physical_plan_type: Some(PhysicalPlanType::Unnest(Box::new(
3420                protobuf::UnnestExecNode {
3421                    input: Some(Box::new(input)),
3422                    schema: Some(exec.schema().try_into()?),
3423                    list_type_columns: exec
3424                        .list_column_indices()
3425                        .iter()
3426                        .map(|c| ProtoListUnnest {
3427                            index_in_input_schema: c.index_in_input_schema as _,
3428                            depth: c.depth as _,
3429                        })
3430                        .collect(),
3431                    struct_type_columns: exec
3432                        .struct_column_indices()
3433                        .iter()
3434                        .map(|c| *c as _)
3435                        .collect(),
3436                    options: Some(exec.options().into()),
3437                },
3438            ))),
3439        })
3440    }
3441
3442    fn try_from_cooperative_exec(
3443        exec: &CooperativeExec,
3444        codec: &dyn PhysicalExtensionCodec,
3445        proto_converter: &dyn PhysicalProtoConverterExtension,
3446    ) -> Result<Self> {
3447        let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3448            exec.input().to_owned(),
3449            codec,
3450            proto_converter,
3451        )?;
3452
3453        Ok(protobuf::PhysicalPlanNode {
3454            physical_plan_type: Some(PhysicalPlanType::Cooperative(Box::new(
3455                protobuf::CooperativeExecNode {
3456                    input: Some(Box::new(input)),
3457                },
3458            ))),
3459        })
3460    }
3461
3462    fn str_to_generate_series_name(name: &str) -> Result<protobuf::GenerateSeriesName> {
3463        match name {
3464            "generate_series" => Ok(protobuf::GenerateSeriesName::GsGenerateSeries),
3465            "range" => Ok(protobuf::GenerateSeriesName::GsRange),
3466            _ => internal_err!("unknown name: {name}"),
3467        }
3468    }
3469
3470    fn try_from_lazy_memory_exec(exec: &LazyMemoryExec) -> Result<Option<Self>> {
3471        let generators = exec.generators();
3472
3473        // ensure we only have one generator
3474        let [generator] = generators.as_slice() else {
3475            return Ok(None);
3476        };
3477
3478        let generator_guard = generator.read();
3479
3480        // Try to downcast to different generate_series types
3481        if let Some(empty_gen) = generator_guard.as_any().downcast_ref::<Empty>() {
3482            let schema = exec.schema();
3483            let node = protobuf::GenerateSeriesNode {
3484                schema: Some(schema.as_ref().try_into()?),
3485                target_batch_size: 8192, // Default batch size
3486                args: Some(protobuf::generate_series_node::Args::ContainsNull(
3487                    protobuf::GenerateSeriesArgsContainsNull {
3488                        name: Self::str_to_generate_series_name(empty_gen.name())? as i32,
3489                    },
3490                )),
3491            };
3492
3493            return Ok(Some(protobuf::PhysicalPlanNode {
3494                physical_plan_type: Some(PhysicalPlanType::GenerateSeries(node)),
3495            }));
3496        }
3497
3498        if let Some(int_64) = generator_guard
3499            .as_any()
3500            .downcast_ref::<GenericSeriesState<i64>>()
3501        {
3502            let schema = exec.schema();
3503            let node = protobuf::GenerateSeriesNode {
3504                schema: Some(schema.as_ref().try_into()?),
3505                target_batch_size: int_64.batch_size() as u32,
3506                args: Some(protobuf::generate_series_node::Args::Int64Args(
3507                    protobuf::GenerateSeriesArgsInt64 {
3508                        start: *int_64.start(),
3509                        end: *int_64.end(),
3510                        step: *int_64.step(),
3511                        include_end: int_64.include_end(),
3512                        name: Self::str_to_generate_series_name(int_64.name())? as i32,
3513                    },
3514                )),
3515            };
3516
3517            return Ok(Some(protobuf::PhysicalPlanNode {
3518                physical_plan_type: Some(PhysicalPlanType::GenerateSeries(node)),
3519            }));
3520        }
3521
3522        if let Some(timestamp_args) = generator_guard
3523            .as_any()
3524            .downcast_ref::<GenericSeriesState<TimestampValue>>()
3525        {
3526            let schema = exec.schema();
3527
3528            let start = timestamp_args.start().value();
3529            let end = timestamp_args.end().value();
3530
3531            let step_value = timestamp_args.step();
3532
3533            let step = Some(datafusion_proto_common::IntervalMonthDayNanoValue {
3534                months: step_value.months,
3535                days: step_value.days,
3536                nanos: step_value.nanoseconds,
3537            });
3538            let include_end = timestamp_args.include_end();
3539            let name = Self::str_to_generate_series_name(timestamp_args.name())? as i32;
3540
3541            let args = match timestamp_args.current().tz_str() {
3542                Some(tz) => protobuf::generate_series_node::Args::TimestampArgs(
3543                    protobuf::GenerateSeriesArgsTimestamp {
3544                        start,
3545                        end,
3546                        step,
3547                        include_end,
3548                        name,
3549                        tz: Some(tz.to_string()),
3550                    },
3551                ),
3552                None => protobuf::generate_series_node::Args::DateArgs(
3553                    protobuf::GenerateSeriesArgsDate {
3554                        start,
3555                        end,
3556                        step,
3557                        include_end,
3558                        name,
3559                    },
3560                ),
3561            };
3562
3563            let node = protobuf::GenerateSeriesNode {
3564                schema: Some(schema.as_ref().try_into()?),
3565                target_batch_size: timestamp_args.batch_size() as u32,
3566                args: Some(args),
3567            };
3568
3569            return Ok(Some(protobuf::PhysicalPlanNode {
3570                physical_plan_type: Some(PhysicalPlanType::GenerateSeries(node)),
3571            }));
3572        }
3573
3574        Ok(None)
3575    }
3576
3577    fn try_from_async_func_exec(
3578        exec: &AsyncFuncExec,
3579        codec: &dyn PhysicalExtensionCodec,
3580        proto_converter: &dyn PhysicalProtoConverterExtension,
3581    ) -> Result<Self> {
3582        let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3583            Arc::clone(exec.input()),
3584            codec,
3585            proto_converter,
3586        )?;
3587
3588        let mut async_exprs = vec![];
3589        let mut async_expr_names = vec![];
3590
3591        for async_expr in exec.async_exprs() {
3592            async_exprs
3593                .push(proto_converter.physical_expr_to_proto(&async_expr.func, codec)?);
3594            async_expr_names.push(async_expr.name.clone())
3595        }
3596
3597        Ok(protobuf::PhysicalPlanNode {
3598            physical_plan_type: Some(PhysicalPlanType::AsyncFunc(Box::new(
3599                protobuf::AsyncFuncExecNode {
3600                    input: Some(Box::new(input)),
3601                    async_exprs,
3602                    async_expr_names,
3603                },
3604            ))),
3605        })
3606    }
3607
3608    fn try_from_buffer_exec(
3609        exec: &BufferExec,
3610        extension_codec: &dyn PhysicalExtensionCodec,
3611        proto_converter: &dyn PhysicalProtoConverterExtension,
3612    ) -> Result<Self> {
3613        let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3614            Arc::clone(exec.input()),
3615            extension_codec,
3616            proto_converter,
3617        )?;
3618
3619        Ok(protobuf::PhysicalPlanNode {
3620            physical_plan_type: Some(PhysicalPlanType::Buffer(Box::new(
3621                protobuf::BufferExecNode {
3622                    input: Some(Box::new(input)),
3623                    capacity: exec.capacity() as u64,
3624                },
3625            ))),
3626        })
3627    }
3628}
3629
3630pub trait AsExecutionPlan: Debug + Send + Sync + Clone {
3631    fn try_decode(buf: &[u8]) -> Result<Self>
3632    where
3633        Self: Sized;
3634
3635    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
3636    where
3637        B: BufMut,
3638        Self: Sized;
3639
3640    fn try_into_physical_plan(
3641        &self,
3642        ctx: &TaskContext,
3643
3644        codec: &dyn PhysicalExtensionCodec,
3645    ) -> Result<Arc<dyn ExecutionPlan>>;
3646
3647    fn try_from_physical_plan(
3648        plan: Arc<dyn ExecutionPlan>,
3649        codec: &dyn PhysicalExtensionCodec,
3650    ) -> Result<Self>
3651    where
3652        Self: Sized;
3653}
3654
3655pub trait PhysicalExtensionCodec: Debug + Send + Sync {
3656    fn try_decode(
3657        &self,
3658        buf: &[u8],
3659        inputs: &[Arc<dyn ExecutionPlan>],
3660        ctx: &TaskContext,
3661    ) -> Result<Arc<dyn ExecutionPlan>>;
3662
3663    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()>;
3664
3665    fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
3666        not_impl_err!("PhysicalExtensionCodec is not provided for scalar function {name}")
3667    }
3668
3669    fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
3670        Ok(())
3671    }
3672
3673    fn try_decode_expr(
3674        &self,
3675        _buf: &[u8],
3676        _inputs: &[Arc<dyn PhysicalExpr>],
3677    ) -> Result<Arc<dyn PhysicalExpr>> {
3678        not_impl_err!("PhysicalExtensionCodec is not provided")
3679    }
3680
3681    fn try_encode_expr(
3682        &self,
3683        _node: &Arc<dyn PhysicalExpr>,
3684        _buf: &mut Vec<u8>,
3685    ) -> Result<()> {
3686        not_impl_err!("PhysicalExtensionCodec is not provided")
3687    }
3688
3689    fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
3690        not_impl_err!(
3691            "PhysicalExtensionCodec is not provided for aggregate function {name}"
3692        )
3693    }
3694
3695    fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
3696        Ok(())
3697    }
3698
3699    fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
3700        not_impl_err!("PhysicalExtensionCodec is not provided for window function {name}")
3701    }
3702
3703    fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
3704        Ok(())
3705    }
3706}
3707
3708#[derive(Debug)]
3709pub struct DefaultPhysicalExtensionCodec {}
3710
3711impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec {
3712    fn try_decode(
3713        &self,
3714        _buf: &[u8],
3715        _inputs: &[Arc<dyn ExecutionPlan>],
3716        _ctx: &TaskContext,
3717    ) -> Result<Arc<dyn ExecutionPlan>> {
3718        not_impl_err!("PhysicalExtensionCodec is not provided")
3719    }
3720
3721    fn try_encode(
3722        &self,
3723        _node: Arc<dyn ExecutionPlan>,
3724        _buf: &mut Vec<u8>,
3725    ) -> Result<()> {
3726        not_impl_err!("PhysicalExtensionCodec is not provided")
3727    }
3728}
3729
3730/// Controls the conversion of physical plans and expressions to and from their
3731/// Protobuf variants. Using this trait, users can perform optimizations on the
3732/// conversion process or collect performance metrics.
3733pub trait PhysicalProtoConverterExtension {
3734    fn proto_to_execution_plan(
3735        &self,
3736        ctx: &TaskContext,
3737        codec: &dyn PhysicalExtensionCodec,
3738        proto: &protobuf::PhysicalPlanNode,
3739    ) -> Result<Arc<dyn ExecutionPlan>>;
3740
3741    fn execution_plan_to_proto(
3742        &self,
3743        plan: &Arc<dyn ExecutionPlan>,
3744        codec: &dyn PhysicalExtensionCodec,
3745    ) -> Result<protobuf::PhysicalPlanNode>;
3746
3747    fn proto_to_physical_expr(
3748        &self,
3749        proto: &protobuf::PhysicalExprNode,
3750        ctx: &TaskContext,
3751        input_schema: &Schema,
3752        codec: &dyn PhysicalExtensionCodec,
3753    ) -> Result<Arc<dyn PhysicalExpr>>;
3754
3755    fn physical_expr_to_proto(
3756        &self,
3757        expr: &Arc<dyn PhysicalExpr>,
3758        codec: &dyn PhysicalExtensionCodec,
3759    ) -> Result<protobuf::PhysicalExprNode>;
3760}
3761
3762/// DataEncoderTuple captures the position of the encoder
3763/// in the codec list that was used to encode the data and actual encoded data
3764#[derive(Clone, PartialEq, prost::Message)]
3765struct DataEncoderTuple {
3766    /// The position of encoder used to encode data
3767    /// (to be used for decoding)
3768    #[prost(uint32, tag = 1)]
3769    pub encoder_position: u32,
3770
3771    #[prost(bytes, tag = 2)]
3772    pub blob: Vec<u8>,
3773}
3774
3775pub struct DefaultPhysicalProtoConverter;
3776impl PhysicalProtoConverterExtension for DefaultPhysicalProtoConverter {
3777    fn proto_to_execution_plan(
3778        &self,
3779        ctx: &TaskContext,
3780        codec: &dyn PhysicalExtensionCodec,
3781        proto: &protobuf::PhysicalPlanNode,
3782    ) -> Result<Arc<dyn ExecutionPlan>> {
3783        proto.try_into_physical_plan_with_converter(ctx, codec, self)
3784    }
3785
3786    fn execution_plan_to_proto(
3787        &self,
3788        plan: &Arc<dyn ExecutionPlan>,
3789        codec: &dyn PhysicalExtensionCodec,
3790    ) -> Result<protobuf::PhysicalPlanNode>
3791    where
3792        Self: Sized,
3793    {
3794        protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3795            Arc::clone(plan),
3796            codec,
3797            self,
3798        )
3799    }
3800
3801    fn proto_to_physical_expr(
3802        &self,
3803        proto: &protobuf::PhysicalExprNode,
3804        ctx: &TaskContext,
3805        input_schema: &Schema,
3806        codec: &dyn PhysicalExtensionCodec,
3807    ) -> Result<Arc<dyn PhysicalExpr>>
3808    where
3809        Self: Sized,
3810    {
3811        // Default implementation calls the free function
3812        parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self)
3813    }
3814
3815    fn physical_expr_to_proto(
3816        &self,
3817        expr: &Arc<dyn PhysicalExpr>,
3818        codec: &dyn PhysicalExtensionCodec,
3819    ) -> Result<protobuf::PhysicalExprNode> {
3820        serialize_physical_expr_with_converter(expr, codec, self)
3821    }
3822}
3823
3824/// Internal serializer that adds expr_id to expressions.
3825/// Created fresh for each serialization operation.
3826struct DeduplicatingSerializer {
3827    /// Random salt combined with pointer addresses and process ID to create globally unique expr_ids.
3828    session_id: u64,
3829}
3830
3831impl DeduplicatingSerializer {
3832    fn new() -> Self {
3833        Self {
3834            session_id: rand::random(),
3835        }
3836    }
3837}
3838
3839impl PhysicalProtoConverterExtension for DeduplicatingSerializer {
3840    fn proto_to_execution_plan(
3841        &self,
3842        _ctx: &TaskContext,
3843        _codec: &dyn PhysicalExtensionCodec,
3844        _proto: &protobuf::PhysicalPlanNode,
3845    ) -> Result<Arc<dyn ExecutionPlan>> {
3846        internal_err!("DeduplicatingSerializer cannot deserialize execution plans")
3847    }
3848
3849    fn execution_plan_to_proto(
3850        &self,
3851        plan: &Arc<dyn ExecutionPlan>,
3852        codec: &dyn PhysicalExtensionCodec,
3853    ) -> Result<protobuf::PhysicalPlanNode>
3854    where
3855        Self: Sized,
3856    {
3857        protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3858            Arc::clone(plan),
3859            codec,
3860            self,
3861        )
3862    }
3863
3864    fn proto_to_physical_expr(
3865        &self,
3866        _proto: &protobuf::PhysicalExprNode,
3867        _ctx: &TaskContext,
3868        _input_schema: &Schema,
3869        _codec: &dyn PhysicalExtensionCodec,
3870    ) -> Result<Arc<dyn PhysicalExpr>>
3871    where
3872        Self: Sized,
3873    {
3874        internal_err!("DeduplicatingSerializer cannot deserialize physical expressions")
3875    }
3876
3877    fn physical_expr_to_proto(
3878        &self,
3879        expr: &Arc<dyn PhysicalExpr>,
3880        codec: &dyn PhysicalExtensionCodec,
3881    ) -> Result<protobuf::PhysicalExprNode> {
3882        let mut proto = serialize_physical_expr_with_converter(expr, codec, self)?;
3883
3884        // Hash session_id, pointer address, and process ID together to create expr_id.
3885        // - session_id: random per serializer, prevents collisions when merging serializations
3886        // - ptr: unique address per Arc within a process
3887        // - pid: prevents collisions if serializer is shared across processes
3888        let mut hasher = DefaultHasher::new();
3889        self.session_id.hash(&mut hasher);
3890        (Arc::as_ptr(expr) as *const () as u64).hash(&mut hasher);
3891        std::process::id().hash(&mut hasher);
3892        proto.expr_id = Some(hasher.finish());
3893
3894        Ok(proto)
3895    }
3896}
3897
3898/// Internal deserializer that caches expressions by expr_id.
3899/// Created fresh for each deserialization operation.
3900#[derive(Default)]
3901struct DeduplicatingDeserializer {
3902    /// Cache mapping expr_id to deserialized expressions.
3903    cache: RefCell<HashMap<u64, Arc<dyn PhysicalExpr>>>,
3904}
3905
3906impl PhysicalProtoConverterExtension for DeduplicatingDeserializer {
3907    fn proto_to_execution_plan(
3908        &self,
3909        ctx: &TaskContext,
3910        codec: &dyn PhysicalExtensionCodec,
3911        proto: &protobuf::PhysicalPlanNode,
3912    ) -> Result<Arc<dyn ExecutionPlan>> {
3913        proto.try_into_physical_plan_with_converter(ctx, codec, self)
3914    }
3915
3916    fn execution_plan_to_proto(
3917        &self,
3918        _plan: &Arc<dyn ExecutionPlan>,
3919        _codec: &dyn PhysicalExtensionCodec,
3920    ) -> Result<protobuf::PhysicalPlanNode>
3921    where
3922        Self: Sized,
3923    {
3924        internal_err!("DeduplicatingDeserializer cannot serialize execution plans")
3925    }
3926
3927    fn proto_to_physical_expr(
3928        &self,
3929        proto: &protobuf::PhysicalExprNode,
3930        ctx: &TaskContext,
3931        input_schema: &Schema,
3932        codec: &dyn PhysicalExtensionCodec,
3933    ) -> Result<Arc<dyn PhysicalExpr>>
3934    where
3935        Self: Sized,
3936    {
3937        if let Some(expr_id) = proto.expr_id {
3938            // Check cache first
3939            if let Some(cached) = self.cache.borrow().get(&expr_id) {
3940                return Ok(Arc::clone(cached));
3941            }
3942            // Deserialize and cache
3943            let expr = parse_physical_expr_with_converter(
3944                proto,
3945                ctx,
3946                input_schema,
3947                codec,
3948                self,
3949            )?;
3950            self.cache.borrow_mut().insert(expr_id, Arc::clone(&expr));
3951            Ok(expr)
3952        } else {
3953            parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self)
3954        }
3955    }
3956
3957    fn physical_expr_to_proto(
3958        &self,
3959        _expr: &Arc<dyn PhysicalExpr>,
3960        _codec: &dyn PhysicalExtensionCodec,
3961    ) -> Result<protobuf::PhysicalExprNode> {
3962        internal_err!("DeduplicatingDeserializer cannot serialize physical expressions")
3963    }
3964}
3965
3966/// A proto converter that adds expression deduplication during serialization
3967/// and deserialization.
3968///
3969/// During serialization, each expression's Arc pointer address is XORed with a
3970/// random session_id to create a salted `expr_id`. This prevents cross-process
3971/// collisions when serialized plans are merged.
3972///
3973/// During deserialization, expressions with the same `expr_id` share the same
3974/// Arc, reducing memory usage for plans with duplicate expressions (e.g., large
3975/// IN lists) and supporting correctly linking [`DynamicFilterPhysicalExpr`] instances.
3976///
3977/// This converter is stateless - it creates internal serializers/deserializers
3978/// on demand for each operation.
3979///
3980/// [`DynamicFilterPhysicalExpr`]: https://docs.rs/datafusion-physical-expr/latest/datafusion_physical_expr/expressions/struct.DynamicFilterPhysicalExpr.html
3981#[derive(Debug, Default, Clone, Copy)]
3982pub struct DeduplicatingProtoConverter {}
3983
3984impl PhysicalProtoConverterExtension for DeduplicatingProtoConverter {
3985    fn proto_to_execution_plan(
3986        &self,
3987        ctx: &TaskContext,
3988        codec: &dyn PhysicalExtensionCodec,
3989        proto: &protobuf::PhysicalPlanNode,
3990    ) -> Result<Arc<dyn ExecutionPlan>> {
3991        let deserializer = DeduplicatingDeserializer::default();
3992        proto.try_into_physical_plan_with_converter(ctx, codec, &deserializer)
3993    }
3994
3995    fn execution_plan_to_proto(
3996        &self,
3997        plan: &Arc<dyn ExecutionPlan>,
3998        codec: &dyn PhysicalExtensionCodec,
3999    ) -> Result<protobuf::PhysicalPlanNode>
4000    where
4001        Self: Sized,
4002    {
4003        let serializer = DeduplicatingSerializer::new();
4004        protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
4005            Arc::clone(plan),
4006            codec,
4007            &serializer,
4008        )
4009    }
4010
4011    fn proto_to_physical_expr(
4012        &self,
4013        proto: &protobuf::PhysicalExprNode,
4014        ctx: &TaskContext,
4015        input_schema: &Schema,
4016        codec: &dyn PhysicalExtensionCodec,
4017    ) -> Result<Arc<dyn PhysicalExpr>>
4018    where
4019        Self: Sized,
4020    {
4021        let deserializer = DeduplicatingDeserializer::default();
4022        deserializer.proto_to_physical_expr(proto, ctx, input_schema, codec)
4023    }
4024
4025    fn physical_expr_to_proto(
4026        &self,
4027        expr: &Arc<dyn PhysicalExpr>,
4028        codec: &dyn PhysicalExtensionCodec,
4029    ) -> Result<protobuf::PhysicalExprNode> {
4030        let serializer = DeduplicatingSerializer::new();
4031        serializer.physical_expr_to_proto(expr, codec)
4032    }
4033}
4034
4035/// A PhysicalExtensionCodec that tries one of multiple inner codecs
4036/// until one works
4037#[derive(Debug)]
4038pub struct ComposedPhysicalExtensionCodec {
4039    codecs: Vec<Arc<dyn PhysicalExtensionCodec>>,
4040}
4041
4042impl ComposedPhysicalExtensionCodec {
4043    // Position in this codecs list is important as it will be used for decoding.
4044    // If new codec is added it should go to last position.
4045    pub fn new(codecs: Vec<Arc<dyn PhysicalExtensionCodec>>) -> Self {
4046        Self { codecs }
4047    }
4048
4049    fn decode_protobuf<R>(
4050        &self,
4051        buf: &[u8],
4052        decode: impl FnOnce(&dyn PhysicalExtensionCodec, &[u8]) -> Result<R>,
4053    ) -> Result<R> {
4054        let proto =
4055            DataEncoderTuple::decode(buf).map_err(|e| internal_datafusion_err!("{e}"))?;
4056
4057        let codec = self.codecs.get(proto.encoder_position as usize).ok_or(
4058            internal_datafusion_err!("Can't find required codec in codec list"),
4059        )?;
4060
4061        decode(codec.as_ref(), &proto.blob)
4062    }
4063
4064    fn encode_protobuf(
4065        &self,
4066        buf: &mut Vec<u8>,
4067        mut encode: impl FnMut(&dyn PhysicalExtensionCodec, &mut Vec<u8>) -> Result<()>,
4068    ) -> Result<()> {
4069        let mut data = vec![];
4070        let mut last_err = None;
4071        let mut encoder_position = None;
4072
4073        // find the encoder
4074        for (position, codec) in self.codecs.iter().enumerate() {
4075            match encode(codec.as_ref(), &mut data) {
4076                Ok(_) => {
4077                    encoder_position = Some(position as u32);
4078                    break;
4079                }
4080                Err(err) => last_err = Some(err),
4081            }
4082        }
4083
4084        let encoder_position = encoder_position.ok_or_else(|| {
4085            last_err.unwrap_or_else(|| {
4086                DataFusionError::NotImplemented(
4087                    "Empty list of composed codecs".to_owned(),
4088                )
4089            })
4090        })?;
4091
4092        // encode with encoder position
4093        let proto = DataEncoderTuple {
4094            encoder_position,
4095            blob: data,
4096        };
4097        proto
4098            .encode(buf)
4099            .map_err(|e| internal_datafusion_err!("{e}"))
4100    }
4101}
4102
4103impl PhysicalExtensionCodec for ComposedPhysicalExtensionCodec {
4104    fn try_decode(
4105        &self,
4106        buf: &[u8],
4107        inputs: &[Arc<dyn ExecutionPlan>],
4108        ctx: &TaskContext,
4109    ) -> Result<Arc<dyn ExecutionPlan>> {
4110        self.decode_protobuf(buf, |codec, data| codec.try_decode(data, inputs, ctx))
4111    }
4112
4113    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
4114        self.encode_protobuf(buf, |codec, data| codec.try_encode(Arc::clone(&node), data))
4115    }
4116
4117    fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
4118        self.decode_protobuf(buf, |codec, data| codec.try_decode_udf(name, data))
4119    }
4120
4121    fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
4122        self.encode_protobuf(buf, |codec, data| codec.try_encode_udf(node, data))
4123    }
4124
4125    fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
4126        self.decode_protobuf(buf, |codec, data| codec.try_decode_udaf(name, data))
4127    }
4128
4129    fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
4130        self.encode_protobuf(buf, |codec, data| codec.try_encode_udaf(node, data))
4131    }
4132}
4133
4134fn into_physical_plan(
4135    node: &Option<Box<protobuf::PhysicalPlanNode>>,
4136    ctx: &TaskContext,
4137    codec: &dyn PhysicalExtensionCodec,
4138    proto_converter: &dyn PhysicalProtoConverterExtension,
4139) -> Result<Arc<dyn ExecutionPlan>> {
4140    if let Some(field) = node {
4141        proto_converter.proto_to_execution_plan(ctx, codec, field)
4142    } else {
4143        Err(proto_error("Missing required field in protobuf"))
4144    }
4145}