Skip to main content

datafusion_proto/physical_plan/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::cell::RefCell;
20use std::collections::HashMap;
21use std::fmt::Debug;
22use std::sync::Arc;
23
24use arrow::compute::SortOptions;
25use arrow::datatypes::{IntervalMonthDayNanoType, Schema, SchemaRef};
26use datafusion_catalog::memory::MemorySourceConfig;
27use datafusion_common::config::CsvOptions;
28use datafusion_common::{
29    DataFusionError, Result, internal_datafusion_err, internal_err, not_impl_err,
30};
31#[cfg(feature = "parquet")]
32use datafusion_datasource::file::FileSource;
33use datafusion_datasource::file_compression_type::FileCompressionType;
34use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
35use datafusion_datasource::sink::DataSinkExec;
36use datafusion_datasource::source::{DataSource, DataSourceExec};
37use datafusion_datasource_arrow::source::ArrowSource;
38#[cfg(feature = "avro")]
39use datafusion_datasource_avro::source::AvroSource;
40use datafusion_datasource_csv::file_format::CsvSink;
41use datafusion_datasource_csv::source::CsvSource;
42use datafusion_datasource_json::file_format::JsonSink;
43use datafusion_datasource_json::source::JsonSource;
44#[cfg(feature = "parquet")]
45use datafusion_datasource_parquet::CachedParquetFileReaderFactory;
46#[cfg(feature = "parquet")]
47use datafusion_datasource_parquet::file_format::ParquetSink;
48#[cfg(feature = "parquet")]
49use datafusion_datasource_parquet::source::ParquetSource;
50#[cfg(feature = "parquet")]
51use datafusion_execution::object_store::ObjectStoreUrl;
52use datafusion_execution::{FunctionRegistry, TaskContext};
53use datafusion_expr::execution_props::{ScalarSubqueryResults, SubqueryIndex};
54use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
55use datafusion_functions_table::generate_series::{
56    Empty, GenSeriesArgs, GenerateSeriesTable, GenericSeriesState, TimestampValue,
57};
58use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
59use datafusion_physical_expr::async_scalar_function::AsyncFuncExpr;
60use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr;
61use datafusion_physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef};
62use datafusion_physical_plan::aggregates::{
63    AggregateExec, AggregateMode, LimitOptions, PhysicalGroupBy,
64};
65use datafusion_physical_plan::analyze::AnalyzeExec;
66use datafusion_physical_plan::async_func::AsyncFuncExec;
67use datafusion_physical_plan::buffer::BufferExec;
68#[expect(deprecated)]
69use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
70use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
71use datafusion_physical_plan::coop::CooperativeExec;
72use datafusion_physical_plan::empty::EmptyExec;
73use datafusion_physical_plan::explain::ExplainExec;
74use datafusion_physical_plan::expressions::PhysicalSortExpr;
75use datafusion_physical_plan::filter::{FilterExec, FilterExecBuilder};
76use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter};
77use datafusion_physical_plan::joins::{
78    CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode, SortMergeJoinExec,
79    StreamJoinPartitionMode, SymmetricHashJoinExec,
80};
81use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
82use datafusion_physical_plan::memory::LazyMemoryExec;
83use datafusion_physical_plan::metrics::{MetricCategory, MetricType};
84use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
85use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
86use datafusion_physical_plan::repartition::RepartitionExec;
87use datafusion_physical_plan::scalar_subquery::{ScalarSubqueryExec, ScalarSubqueryLink};
88use datafusion_physical_plan::sorts::sort::SortExec;
89use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
90use datafusion_physical_plan::union::{InterleaveExec, UnionExec};
91use datafusion_physical_plan::unnest::{ListUnnest, UnnestExec};
92use datafusion_physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
93use datafusion_physical_plan::{ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr};
94use prost::Message;
95use prost::bytes::BufMut;
96
97use self::from_proto::parse_protobuf_partitioning;
98use self::to_proto::serialize_partitioning;
99use crate::common::{byte_to_string, str_to_byte};
100use crate::physical_plan::from_proto::{
101    parse_physical_expr_with_converter, parse_physical_sort_expr,
102    parse_physical_sort_exprs, parse_physical_window_expr,
103    parse_protobuf_file_scan_config, parse_record_batches, parse_table_schema_from_proto,
104};
105use crate::physical_plan::to_proto::{
106    serialize_file_scan_config, serialize_maybe_filter, serialize_physical_aggr_expr,
107    serialize_physical_expr_with_converter, serialize_physical_sort_exprs,
108    serialize_physical_window_expr, serialize_record_batches,
109};
110use crate::protobuf::physical_aggregate_expr_node::AggregateFunction;
111use crate::protobuf::physical_expr_node::ExprType;
112use crate::protobuf::physical_plan_node::PhysicalPlanType;
113use crate::protobuf::{
114    self, ListUnnest as ProtoListUnnest, SortExprNode, SortMergeJoinExecNode,
115    proto_error, window_agg_exec_node,
116};
117use crate::{convert_required, into_required};
118
119pub mod from_proto;
120pub mod to_proto;
121
122const HUMAN_DISPLAY_ALIAS_PREFIX: &str = "\u{1f}datafusion_human_display_alias_v1:";
123
124fn encode_human_display_alias(human_display: &str, alias: &str) -> String {
125    format!(
126        "{HUMAN_DISPLAY_ALIAS_PREFIX}{}:{alias}{human_display}",
127        alias.len()
128    )
129}
130
131fn split_human_display_alias<'a>(
132    human_display: &'a str,
133    name: &'a str,
134) -> (&'a str, Option<&'a str>) {
135    if let Some(encoded) = human_display.strip_prefix(HUMAN_DISPLAY_ALIAS_PREFIX)
136        && let Some((alias_len, encoded)) = encoded.split_once(':')
137        && let Ok(alias_len) = alias_len.parse::<usize>()
138        && let Some(alias) = encoded.get(..alias_len)
139        && let Some(human_display) = encoded.get(alias_len..)
140        && alias == name
141        && !human_display.is_empty()
142    {
143        return (human_display, Some(alias));
144    }
145
146    (human_display, None)
147}
148
149#[cfg(test)]
150mod tests {
151    use super::*;
152
153    #[test]
154    fn split_human_display_alias_ignores_mismatched_alias() {
155        let encoded = encode_human_display_alias("sum(value)", "revenue");
156
157        assert_eq!(
158            split_human_display_alias(&encoded, "other"),
159            (encoded.as_str(), None)
160        );
161    }
162
163    #[test]
164    fn split_human_display_alias_keeps_malformed_prefix_literal() {
165        let display = format!("{HUMAN_DISPLAY_ALIAS_PREFIX}not-an-encoding");
166
167        assert_eq!(
168            split_human_display_alias(&display, "agg"),
169            (display.as_str(), None)
170        );
171    }
172}
173
174/// Context threaded through physical-plan deserialization.
175///
176/// This bundles the stable per-call inputs for deserialization and the
177/// per-scope `ScalarSubqueryResults` handle needed while reconstructing
178/// `ScalarSubqueryExpr` nodes inside a `ScalarSubqueryExec` input plan.
179#[derive(Clone)]
180pub struct PhysicalPlanDecodeContext<'a> {
181    task_ctx: &'a TaskContext,
182    codec: &'a dyn PhysicalExtensionCodec,
183    scalar_subquery_results: Option<ScalarSubqueryResults>,
184}
185
186impl<'a> PhysicalPlanDecodeContext<'a> {
187    /// Creates a new root decode context.
188    pub fn new(task_ctx: &'a TaskContext, codec: &'a dyn PhysicalExtensionCodec) -> Self {
189        Self {
190            task_ctx,
191            codec,
192            scalar_subquery_results: None,
193        }
194    }
195
196    /// Returns the task context used for deserialization.
197    pub fn task_ctx(&self) -> &'a TaskContext {
198        self.task_ctx
199    }
200
201    /// Returns the physical extension codec used for deserialization.
202    pub fn codec(&self) -> &'a dyn PhysicalExtensionCodec {
203        self.codec
204    }
205
206    /// Returns the scalar subquery results container for the current scope, if
207    /// one is active.
208    pub fn scalar_subquery_results(&self) -> Option<&ScalarSubqueryResults> {
209        self.scalar_subquery_results.as_ref()
210    }
211
212    /// Returns a child context with a different scalar subquery results
213    /// container.
214    pub fn with_scalar_subquery_results(
215        &self,
216        scalar_subquery_results: ScalarSubqueryResults,
217    ) -> Self {
218        Self {
219            task_ctx: self.task_ctx,
220            codec: self.codec,
221            scalar_subquery_results: Some(scalar_subquery_results),
222        }
223    }
224}
225
226impl AsExecutionPlan for protobuf::PhysicalPlanNode {
227    fn try_decode(buf: &[u8]) -> Result<Self>
228    where
229        Self: Sized,
230    {
231        protobuf::PhysicalPlanNode::decode(buf).map_err(|e| {
232            internal_datafusion_err!("failed to decode physical plan: {e:?}")
233        })
234    }
235
236    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
237    where
238        B: BufMut,
239        Self: Sized,
240    {
241        self.encode(buf).map_err(|e| {
242            internal_datafusion_err!("failed to encode physical plan: {e:?}")
243        })
244    }
245
246    fn try_into_physical_plan(
247        &self,
248        ctx: &TaskContext,
249        codec: &dyn PhysicalExtensionCodec,
250    ) -> Result<Arc<dyn ExecutionPlan>> {
251        self.try_into_physical_plan_with_converter(
252            ctx,
253            codec,
254            &DefaultPhysicalProtoConverter {},
255        )
256    }
257
258    fn try_from_physical_plan(
259        plan: Arc<dyn ExecutionPlan>,
260        codec: &dyn PhysicalExtensionCodec,
261    ) -> Result<Self>
262    where
263        Self: Sized,
264    {
265        Self::try_from_physical_plan_with_converter(
266            plan,
267            codec,
268            &DefaultPhysicalProtoConverter {},
269        )
270    }
271}
272
273impl protobuf::PhysicalPlanNode {
274    pub fn try_into_physical_plan_with_converter(
275        &self,
276        ctx: &TaskContext,
277        codec: &dyn PhysicalExtensionCodec,
278        proto_converter: &dyn PhysicalProtoConverterExtension,
279    ) -> Result<Arc<dyn ExecutionPlan>> {
280        let decode_ctx = PhysicalPlanDecodeContext::new(ctx, codec);
281        self.try_into_physical_plan_with_context(&decode_ctx, proto_converter)
282    }
283
284    pub(crate) fn try_into_physical_plan_with_context(
285        &self,
286        ctx: &PhysicalPlanDecodeContext<'_>,
287        proto_converter: &dyn PhysicalProtoConverterExtension,
288    ) -> Result<Arc<dyn ExecutionPlan>> {
289        let plan = self.physical_plan_type.as_ref().ok_or_else(|| {
290            proto_error(format!(
291                "physical_plan::from_proto() Unsupported physical plan '{self:?}'"
292            ))
293        })?;
294        match plan {
295            PhysicalPlanType::Explain(explain) => {
296                self.try_into_explain_physical_plan(explain, ctx, proto_converter)
297            }
298            PhysicalPlanType::Projection(projection) => {
299                self.try_into_projection_physical_plan(projection, ctx, proto_converter)
300            }
301            PhysicalPlanType::Filter(filter) => {
302                self.try_into_filter_physical_plan(filter, ctx, proto_converter)
303            }
304            PhysicalPlanType::CsvScan(scan) => {
305                self.try_into_csv_scan_physical_plan(scan, ctx, proto_converter)
306            }
307            PhysicalPlanType::JsonScan(scan) => {
308                self.try_into_json_scan_physical_plan(scan, ctx, proto_converter)
309            }
310            PhysicalPlanType::ParquetScan(scan) => {
311                self.try_into_parquet_scan_physical_plan(scan, ctx, proto_converter)
312            }
313            PhysicalPlanType::AvroScan(scan) => {
314                self.try_into_avro_scan_physical_plan(scan, ctx, proto_converter)
315            }
316            PhysicalPlanType::MemoryScan(scan) => {
317                self.try_into_memory_scan_physical_plan(scan, ctx, proto_converter)
318            }
319            PhysicalPlanType::ArrowScan(scan) => {
320                self.try_into_arrow_scan_physical_plan(scan, ctx, proto_converter)
321            }
322            PhysicalPlanType::CoalesceBatches(coalesce_batches) => self
323                .try_into_coalesce_batches_physical_plan(
324                    coalesce_batches,
325                    ctx,
326                    proto_converter,
327                ),
328            PhysicalPlanType::Merge(merge) => {
329                self.try_into_merge_physical_plan(merge, ctx, proto_converter)
330            }
331            PhysicalPlanType::Repartition(repart) => {
332                self.try_into_repartition_physical_plan(repart, ctx, proto_converter)
333            }
334            PhysicalPlanType::GlobalLimit(limit) => {
335                self.try_into_global_limit_physical_plan(limit, ctx, proto_converter)
336            }
337            PhysicalPlanType::LocalLimit(limit) => {
338                self.try_into_local_limit_physical_plan(limit, ctx, proto_converter)
339            }
340            PhysicalPlanType::Window(window_agg) => {
341                self.try_into_window_physical_plan(window_agg, ctx, proto_converter)
342            }
343            PhysicalPlanType::Aggregate(hash_agg) => {
344                self.try_into_aggregate_physical_plan(hash_agg, ctx, proto_converter)
345            }
346            PhysicalPlanType::HashJoin(hashjoin) => {
347                self.try_into_hash_join_physical_plan(hashjoin, ctx, proto_converter)
348            }
349            PhysicalPlanType::SymmetricHashJoin(sym_join) => self
350                .try_into_symmetric_hash_join_physical_plan(
351                    sym_join,
352                    ctx,
353                    proto_converter,
354                ),
355            PhysicalPlanType::Union(union) => {
356                self.try_into_union_physical_plan(union, ctx, proto_converter)
357            }
358            PhysicalPlanType::Interleave(interleave) => {
359                self.try_into_interleave_physical_plan(interleave, ctx, proto_converter)
360            }
361            PhysicalPlanType::CrossJoin(crossjoin) => {
362                self.try_into_cross_join_physical_plan(crossjoin, ctx, proto_converter)
363            }
364            PhysicalPlanType::Empty(empty) => {
365                self.try_into_empty_physical_plan(empty, ctx, proto_converter)
366            }
367            PhysicalPlanType::PlaceholderRow(placeholder) => {
368                self.try_into_placeholder_row_physical_plan(placeholder, ctx)
369            }
370            PhysicalPlanType::Sort(sort) => {
371                self.try_into_sort_physical_plan(sort, ctx, proto_converter)
372            }
373            PhysicalPlanType::SortPreservingMerge(sort) => self
374                .try_into_sort_preserving_merge_physical_plan(sort, ctx, proto_converter),
375            PhysicalPlanType::Extension(extension) => {
376                self.try_into_extension_physical_plan(extension, ctx, proto_converter)
377            }
378            PhysicalPlanType::NestedLoopJoin(join) => {
379                self.try_into_nested_loop_join_physical_plan(join, ctx, proto_converter)
380            }
381            PhysicalPlanType::Analyze(analyze) => {
382                self.try_into_analyze_physical_plan(analyze, ctx, proto_converter)
383            }
384            PhysicalPlanType::JsonSink(sink) => {
385                self.try_into_json_sink_physical_plan(sink, ctx, proto_converter)
386            }
387            PhysicalPlanType::CsvSink(sink) => {
388                self.try_into_csv_sink_physical_plan(sink, ctx, proto_converter)
389            }
390            #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
391            PhysicalPlanType::ParquetSink(sink) => {
392                self.try_into_parquet_sink_physical_plan(sink, ctx, proto_converter)
393            }
394            PhysicalPlanType::Unnest(unnest) => {
395                self.try_into_unnest_physical_plan(unnest, ctx, proto_converter)
396            }
397            PhysicalPlanType::Cooperative(cooperative) => {
398                self.try_into_cooperative_physical_plan(cooperative, ctx, proto_converter)
399            }
400            PhysicalPlanType::GenerateSeries(generate_series) => {
401                self.try_into_generate_series_physical_plan(generate_series)
402            }
403            PhysicalPlanType::SortMergeJoin(sort_join) => {
404                self.try_into_sort_join(sort_join, ctx, proto_converter)
405            }
406            PhysicalPlanType::AsyncFunc(async_func) => {
407                self.try_into_async_func_physical_plan(async_func, ctx, proto_converter)
408            }
409            PhysicalPlanType::Buffer(buffer) => {
410                self.try_into_buffer_physical_plan(buffer, ctx, proto_converter)
411            }
412            PhysicalPlanType::ScalarSubquery(sq) => {
413                self.try_into_scalar_subquery_physical_plan(sq, ctx, proto_converter)
414            }
415        }
416    }
417
418    pub fn try_from_physical_plan_with_converter(
419        plan: Arc<dyn ExecutionPlan>,
420        codec: &dyn PhysicalExtensionCodec,
421        proto_converter: &dyn PhysicalProtoConverterExtension,
422    ) -> Result<Self>
423    where
424        Self: Sized,
425    {
426        let plan_clone = Arc::clone(&plan);
427        let plan = plan.as_ref() as &dyn Any;
428
429        if let Some(exec) = plan.downcast_ref::<ExplainExec>() {
430            return protobuf::PhysicalPlanNode::try_from_explain_exec(exec, codec);
431        }
432
433        if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
434            return protobuf::PhysicalPlanNode::try_from_projection_exec(
435                exec,
436                codec,
437                proto_converter,
438            );
439        }
440
441        if let Some(exec) = plan.downcast_ref::<AnalyzeExec>() {
442            return protobuf::PhysicalPlanNode::try_from_analyze_exec(
443                exec,
444                codec,
445                proto_converter,
446            );
447        }
448
449        if let Some(exec) = plan.downcast_ref::<FilterExec>() {
450            return protobuf::PhysicalPlanNode::try_from_filter_exec(
451                exec,
452                codec,
453                proto_converter,
454            );
455        }
456
457        if let Some(limit) = plan.downcast_ref::<GlobalLimitExec>() {
458            return protobuf::PhysicalPlanNode::try_from_global_limit_exec(
459                limit,
460                codec,
461                proto_converter,
462            );
463        }
464
465        if let Some(limit) = plan.downcast_ref::<LocalLimitExec>() {
466            return protobuf::PhysicalPlanNode::try_from_local_limit_exec(
467                limit,
468                codec,
469                proto_converter,
470            );
471        }
472
473        if let Some(exec) = plan.downcast_ref::<HashJoinExec>() {
474            return protobuf::PhysicalPlanNode::try_from_hash_join_exec(
475                exec,
476                codec,
477                proto_converter,
478            );
479        }
480
481        if let Some(exec) = plan.downcast_ref::<SymmetricHashJoinExec>() {
482            return protobuf::PhysicalPlanNode::try_from_symmetric_hash_join_exec(
483                exec,
484                codec,
485                proto_converter,
486            );
487        }
488
489        if let Some(exec) = plan.downcast_ref::<SortMergeJoinExec>() {
490            return protobuf::PhysicalPlanNode::try_from_sort_merge_join_exec(
491                exec,
492                codec,
493                proto_converter,
494            );
495        }
496
497        if let Some(exec) = plan.downcast_ref::<CrossJoinExec>() {
498            return protobuf::PhysicalPlanNode::try_from_cross_join_exec(
499                exec,
500                codec,
501                proto_converter,
502            );
503        }
504
505        if let Some(exec) = plan.downcast_ref::<AggregateExec>() {
506            return protobuf::PhysicalPlanNode::try_from_aggregate_exec(
507                exec,
508                codec,
509                proto_converter,
510            );
511        }
512
513        if let Some(empty) = plan.downcast_ref::<EmptyExec>() {
514            return protobuf::PhysicalPlanNode::try_from_empty_exec(empty, codec);
515        }
516
517        if let Some(empty) = plan.downcast_ref::<PlaceholderRowExec>() {
518            return protobuf::PhysicalPlanNode::try_from_placeholder_row_exec(
519                empty, codec,
520            );
521        }
522
523        #[expect(deprecated)]
524        if let Some(coalesce_batches) = plan.downcast_ref::<CoalesceBatchesExec>() {
525            return protobuf::PhysicalPlanNode::try_from_coalesce_batches_exec(
526                coalesce_batches,
527                codec,
528                proto_converter,
529            );
530        }
531
532        if let Some(data_source_exec) = plan.downcast_ref::<DataSourceExec>()
533            && let Some(node) = protobuf::PhysicalPlanNode::try_from_data_source_exec(
534                data_source_exec,
535                codec,
536                proto_converter,
537            )?
538        {
539            return Ok(node);
540        }
541
542        if let Some(exec) = plan.downcast_ref::<CoalescePartitionsExec>() {
543            return protobuf::PhysicalPlanNode::try_from_coalesce_partitions_exec(
544                exec,
545                codec,
546                proto_converter,
547            );
548        }
549
550        if let Some(exec) = plan.downcast_ref::<RepartitionExec>() {
551            return protobuf::PhysicalPlanNode::try_from_repartition_exec(
552                exec,
553                codec,
554                proto_converter,
555            );
556        }
557
558        if let Some(exec) = plan.downcast_ref::<SortExec>() {
559            return protobuf::PhysicalPlanNode::try_from_sort_exec(
560                exec,
561                codec,
562                proto_converter,
563            );
564        }
565
566        if let Some(union) = plan.downcast_ref::<UnionExec>() {
567            return protobuf::PhysicalPlanNode::try_from_union_exec(
568                union,
569                codec,
570                proto_converter,
571            );
572        }
573
574        if let Some(interleave) = plan.downcast_ref::<InterleaveExec>() {
575            return protobuf::PhysicalPlanNode::try_from_interleave_exec(
576                interleave,
577                codec,
578                proto_converter,
579            );
580        }
581
582        if let Some(exec) = plan.downcast_ref::<SortPreservingMergeExec>() {
583            return protobuf::PhysicalPlanNode::try_from_sort_preserving_merge_exec(
584                exec,
585                codec,
586                proto_converter,
587            );
588        }
589
590        if let Some(exec) = plan.downcast_ref::<NestedLoopJoinExec>() {
591            return protobuf::PhysicalPlanNode::try_from_nested_loop_join_exec(
592                exec,
593                codec,
594                proto_converter,
595            );
596        }
597
598        if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
599            return protobuf::PhysicalPlanNode::try_from_window_agg_exec(
600                exec,
601                codec,
602                proto_converter,
603            );
604        }
605
606        if let Some(exec) = plan.downcast_ref::<BoundedWindowAggExec>() {
607            return protobuf::PhysicalPlanNode::try_from_bounded_window_agg_exec(
608                exec,
609                codec,
610                proto_converter,
611            );
612        }
613
614        if let Some(exec) = plan.downcast_ref::<DataSinkExec>()
615            && let Some(node) = protobuf::PhysicalPlanNode::try_from_data_sink_exec(
616                exec,
617                codec,
618                proto_converter,
619            )?
620        {
621            return Ok(node);
622        }
623
624        if let Some(exec) = plan.downcast_ref::<UnnestExec>() {
625            return protobuf::PhysicalPlanNode::try_from_unnest_exec(
626                exec,
627                codec,
628                proto_converter,
629            );
630        }
631
632        if let Some(exec) = plan.downcast_ref::<CooperativeExec>() {
633            return protobuf::PhysicalPlanNode::try_from_cooperative_exec(
634                exec,
635                codec,
636                proto_converter,
637            );
638        }
639
640        if let Some(exec) = plan.downcast_ref::<LazyMemoryExec>()
641            && let Some(node) =
642                protobuf::PhysicalPlanNode::try_from_lazy_memory_exec(exec)?
643        {
644            return Ok(node);
645        }
646
647        if let Some(exec) = plan.downcast_ref::<AsyncFuncExec>() {
648            return protobuf::PhysicalPlanNode::try_from_async_func_exec(
649                exec,
650                codec,
651                proto_converter,
652            );
653        }
654
655        if let Some(exec) = plan.downcast_ref::<BufferExec>() {
656            return protobuf::PhysicalPlanNode::try_from_buffer_exec(
657                exec,
658                codec,
659                proto_converter,
660            );
661        }
662
663        if let Some(exec) = plan.downcast_ref::<ScalarSubqueryExec>() {
664            return protobuf::PhysicalPlanNode::try_from_scalar_subquery_exec(
665                exec,
666                codec,
667                proto_converter,
668            );
669        }
670
671        let mut buf: Vec<u8> = vec![];
672        match codec.try_encode(Arc::clone(&plan_clone), &mut buf) {
673            Ok(_) => {
674                let inputs: Vec<protobuf::PhysicalPlanNode> = plan_clone
675                    .children()
676                    .into_iter()
677                    .cloned()
678                    .map(|i| {
679                        protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
680                            i,
681                            codec,
682                            proto_converter,
683                        )
684                    })
685                    .collect::<Result<_>>()?;
686
687                Ok(protobuf::PhysicalPlanNode {
688                    physical_plan_type: Some(PhysicalPlanType::Extension(
689                        protobuf::PhysicalExtensionNode { node: buf, inputs },
690                    )),
691                })
692            }
693            Err(e) => internal_err!(
694                "Unsupported plan and extension codec failed with [{e}]. Plan: {plan_clone:?}"
695            ),
696        }
697    }
698}
699
700impl protobuf::PhysicalPlanNode {
701    fn try_into_explain_physical_plan(
702        &self,
703        explain: &protobuf::ExplainExecNode,
704        _ctx: &PhysicalPlanDecodeContext<'_>,
705        _proto_converter: &dyn PhysicalProtoConverterExtension,
706    ) -> Result<Arc<dyn ExecutionPlan>> {
707        Ok(Arc::new(ExplainExec::new(
708            Arc::new(explain.schema.as_ref().unwrap().try_into()?),
709            explain
710                .stringified_plans
711                .iter()
712                .map(|plan| plan.into())
713                .collect(),
714            explain.verbose,
715        )))
716    }
717
718    fn try_into_projection_physical_plan(
719        &self,
720        projection: &protobuf::ProjectionExecNode,
721        ctx: &PhysicalPlanDecodeContext<'_>,
722        proto_converter: &dyn PhysicalProtoConverterExtension,
723    ) -> Result<Arc<dyn ExecutionPlan>> {
724        let input: Arc<dyn ExecutionPlan> =
725            into_physical_plan(&projection.input, ctx, proto_converter)?;
726        let exprs = projection
727            .expr
728            .iter()
729            .zip(projection.expr_name.iter())
730            .map(|(expr, name)| {
731                Ok((
732                    proto_converter.proto_to_physical_expr(
733                        expr,
734                        input.schema().as_ref(),
735                        ctx,
736                    )?,
737                    name.to_string(),
738                ))
739            })
740            .collect::<Result<Vec<(Arc<dyn PhysicalExpr>, String)>>>()?;
741        let proj_exprs: Vec<ProjectionExpr> = exprs
742            .into_iter()
743            .map(|(expr, alias)| ProjectionExpr { expr, alias })
744            .collect();
745        Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?))
746    }
747
748    fn try_into_filter_physical_plan(
749        &self,
750        filter: &protobuf::FilterExecNode,
751        ctx: &PhysicalPlanDecodeContext<'_>,
752        proto_converter: &dyn PhysicalProtoConverterExtension,
753    ) -> Result<Arc<dyn ExecutionPlan>> {
754        let input: Arc<dyn ExecutionPlan> =
755            into_physical_plan(&filter.input, ctx, proto_converter)?;
756
757        let predicate = filter
758            .expr
759            .as_ref()
760            .map(|expr| {
761                proto_converter.proto_to_physical_expr(expr, input.schema().as_ref(), ctx)
762            })
763            .transpose()?
764            .ok_or_else(|| {
765                internal_datafusion_err!(
766                    "filter (FilterExecNode) in PhysicalPlanNode is missing."
767                )
768            })?;
769
770        let filter_selectivity = filter.default_filter_selectivity.try_into();
771        // Preserve the `None` state across proto boundaries. Proto cannot distinguish
772        // between `None` (full projection) and `Some(vec![])` (empty projection) since
773        // both serialize as an empty list. If all columns are included, we reconstruct
774        // `None` to avoid losing this semantic distinction on deserialization.
775        let num_fields = input.schema().fields().len();
776        let mut is_full_projection = filter.projection.len() == num_fields;
777        let mut projection_vec: Vec<usize> = Vec::with_capacity(filter.projection.len());
778        for (i, idx) in filter.projection.iter().enumerate() {
779            let idx = *idx as usize;
780            is_full_projection &= idx == i;
781            projection_vec.push(idx);
782        }
783        let projection = if is_full_projection {
784            None
785        } else {
786            Some(projection_vec)
787        };
788        let filter = FilterExecBuilder::new(predicate, input)
789            .apply_projection(projection)?
790            .with_batch_size(filter.batch_size as usize)
791            .with_fetch(filter.fetch.map(|f| f as usize))
792            .build()?;
793        match filter_selectivity {
794            Ok(filter_selectivity) => Ok(Arc::new(
795                filter.with_default_selectivity(filter_selectivity)?,
796            )),
797            Err(_) => Err(internal_datafusion_err!(
798                "filter_selectivity in PhysicalPlanNode is invalid "
799            )),
800        }
801    }
802
803    fn try_into_csv_scan_physical_plan(
804        &self,
805        scan: &protobuf::CsvScanExecNode,
806        ctx: &PhysicalPlanDecodeContext<'_>,
807        proto_converter: &dyn PhysicalProtoConverterExtension,
808    ) -> Result<Arc<dyn ExecutionPlan>> {
809        let escape =
810            if let Some(protobuf::csv_scan_exec_node::OptionalEscape::Escape(escape)) =
811                &scan.optional_escape
812            {
813                Some(str_to_byte(escape, "escape")?)
814            } else {
815                None
816            };
817
818        let comment = if let Some(
819            protobuf::csv_scan_exec_node::OptionalComment::Comment(comment),
820        ) = &scan.optional_comment
821        {
822            Some(str_to_byte(comment, "comment")?)
823        } else {
824            None
825        };
826
827        // Parse table schema with partition columns
828        let table_schema =
829            parse_table_schema_from_proto(scan.base_conf.as_ref().unwrap())?;
830
831        let csv_options = CsvOptions {
832            has_header: Some(scan.has_header),
833            delimiter: str_to_byte(&scan.delimiter, "delimiter")?,
834            quote: str_to_byte(&scan.quote, "quote")?,
835            newlines_in_values: Some(scan.newlines_in_values),
836            ..Default::default()
837        };
838        let source = Arc::new(
839            CsvSource::new(table_schema)
840                .with_csv_options(csv_options)
841                .with_escape(escape)
842                .with_comment(comment),
843        );
844
845        let conf = FileScanConfigBuilder::from(parse_protobuf_file_scan_config(
846            scan.base_conf.as_ref().unwrap(),
847            ctx,
848            proto_converter,
849            source,
850        )?)
851        .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
852        .build();
853        Ok(DataSourceExec::from_data_source(conf))
854    }
855
856    fn try_into_json_scan_physical_plan(
857        &self,
858        scan: &protobuf::JsonScanExecNode,
859        ctx: &PhysicalPlanDecodeContext<'_>,
860        proto_converter: &dyn PhysicalProtoConverterExtension,
861    ) -> Result<Arc<dyn ExecutionPlan>> {
862        let base_conf = scan.base_conf.as_ref().unwrap();
863        let table_schema = parse_table_schema_from_proto(base_conf)?;
864        let scan_conf = parse_protobuf_file_scan_config(
865            base_conf,
866            ctx,
867            proto_converter,
868            Arc::new(JsonSource::new(table_schema)),
869        )?;
870        Ok(DataSourceExec::from_data_source(scan_conf))
871    }
872
873    fn try_into_arrow_scan_physical_plan(
874        &self,
875        scan: &protobuf::ArrowScanExecNode,
876        ctx: &PhysicalPlanDecodeContext<'_>,
877        proto_converter: &dyn PhysicalProtoConverterExtension,
878    ) -> Result<Arc<dyn ExecutionPlan>> {
879        let base_conf = scan.base_conf.as_ref().ok_or_else(|| {
880            internal_datafusion_err!("base_conf in ArrowScanExecNode is missing.")
881        })?;
882        let table_schema = parse_table_schema_from_proto(base_conf)?;
883        let scan_conf = parse_protobuf_file_scan_config(
884            base_conf,
885            ctx,
886            proto_converter,
887            Arc::new(ArrowSource::new_file_source(table_schema)),
888        )?;
889        Ok(DataSourceExec::from_data_source(scan_conf))
890    }
891
892    #[cfg_attr(not(feature = "parquet"), expect(unused_variables))]
893    fn try_into_parquet_scan_physical_plan(
894        &self,
895        scan: &protobuf::ParquetScanExecNode,
896        ctx: &PhysicalPlanDecodeContext<'_>,
897        proto_converter: &dyn PhysicalProtoConverterExtension,
898    ) -> Result<Arc<dyn ExecutionPlan>> {
899        #[cfg(feature = "parquet")]
900        {
901            let schema = from_proto::parse_protobuf_file_scan_schema(
902                scan.base_conf.as_ref().unwrap(),
903            )?;
904
905            // Check if there's a projection and use projected schema for predicate parsing
906            let base_conf = scan.base_conf.as_ref().unwrap();
907            let predicate_schema = if !base_conf.projection.is_empty() {
908                // Create projected schema for parsing the predicate
909                let projected_fields: Vec<_> = base_conf
910                    .projection
911                    .iter()
912                    .map(|&i| schema.field(i as usize).clone())
913                    .collect();
914                Arc::new(Schema::new(projected_fields))
915            } else {
916                schema
917            };
918
919            let predicate = scan
920                .predicate
921                .as_ref()
922                .map(|expr| {
923                    proto_converter.proto_to_physical_expr(
924                        expr,
925                        predicate_schema.as_ref(),
926                        ctx,
927                    )
928                })
929                .transpose()?;
930            let mut options = datafusion_common::config::TableParquetOptions::default();
931
932            if let Some(table_options) = scan.parquet_options.as_ref() {
933                options = table_options.try_into()?;
934            }
935
936            // Parse table schema with partition columns
937            let table_schema = parse_table_schema_from_proto(base_conf)?;
938            let object_store_url = match base_conf.object_store_url.is_empty() {
939                false => ObjectStoreUrl::parse(&base_conf.object_store_url)?,
940                true => ObjectStoreUrl::local_filesystem(),
941            };
942            let store = ctx
943                .task_ctx()
944                .runtime_env()
945                .object_store(object_store_url)?;
946            let metadata_cache = ctx
947                .task_ctx()
948                .runtime_env()
949                .cache_manager
950                .get_file_metadata_cache();
951            let reader_factory =
952                Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache));
953
954            let mut source = ParquetSource::new(table_schema)
955                .with_parquet_file_reader_factory(reader_factory)
956                .with_table_parquet_options(options);
957
958            if let Some(predicate) = predicate {
959                source = source.with_predicate(predicate);
960            }
961            let base_config = parse_protobuf_file_scan_config(
962                base_conf,
963                ctx,
964                proto_converter,
965                Arc::new(source),
966            )?;
967            Ok(DataSourceExec::from_data_source(base_config))
968        }
969        #[cfg(not(feature = "parquet"))]
970        panic!(
971            "Unable to process a Parquet PhysicalPlan when `parquet` feature is not enabled"
972        )
973    }
974
975    #[cfg_attr(not(feature = "avro"), expect(unused_variables))]
976    fn try_into_avro_scan_physical_plan(
977        &self,
978        scan: &protobuf::AvroScanExecNode,
979        ctx: &PhysicalPlanDecodeContext<'_>,
980        proto_converter: &dyn PhysicalProtoConverterExtension,
981    ) -> Result<Arc<dyn ExecutionPlan>> {
982        #[cfg(feature = "avro")]
983        {
984            let table_schema =
985                parse_table_schema_from_proto(scan.base_conf.as_ref().unwrap())?;
986            let conf = parse_protobuf_file_scan_config(
987                scan.base_conf.as_ref().unwrap(),
988                ctx,
989                proto_converter,
990                Arc::new(AvroSource::new(table_schema)),
991            )?;
992            Ok(DataSourceExec::from_data_source(conf))
993        }
994
995        #[cfg(not(feature = "avro"))]
996        panic!("Unable to process a Avro PhysicalPlan when `avro` feature is not enabled")
997    }
998
999    fn try_into_memory_scan_physical_plan(
1000        &self,
1001        scan: &protobuf::MemoryScanExecNode,
1002        ctx: &PhysicalPlanDecodeContext<'_>,
1003        proto_converter: &dyn PhysicalProtoConverterExtension,
1004    ) -> Result<Arc<dyn ExecutionPlan>> {
1005        let partitions = scan
1006            .partitions
1007            .iter()
1008            .map(|p| parse_record_batches(p))
1009            .collect::<Result<Vec<_>>>()?;
1010
1011        let proto_schema = scan.schema.as_ref().ok_or_else(|| {
1012            internal_datafusion_err!("schema in MemoryScanExecNode is missing.")
1013        })?;
1014        let schema: SchemaRef = SchemaRef::new(proto_schema.try_into()?);
1015
1016        let projection = if !scan.projection.is_empty() {
1017            Some(
1018                scan.projection
1019                    .iter()
1020                    .map(|i| *i as usize)
1021                    .collect::<Vec<_>>(),
1022            )
1023        } else {
1024            None
1025        };
1026
1027        let mut sort_information = vec![];
1028        for ordering in &scan.sort_information {
1029            let sort_exprs = parse_physical_sort_exprs(
1030                &ordering.physical_sort_expr_nodes,
1031                ctx,
1032                &schema,
1033                proto_converter,
1034            )?;
1035            sort_information.extend(LexOrdering::new(sort_exprs));
1036        }
1037
1038        let source = MemorySourceConfig::try_new(&partitions, schema, projection)?
1039            .with_limit(scan.fetch.map(|f| f as usize))
1040            .with_show_sizes(scan.show_sizes);
1041
1042        let source = source.try_with_sort_information(sort_information)?;
1043
1044        Ok(DataSourceExec::from_data_source(source))
1045    }
1046
1047    fn try_into_coalesce_batches_physical_plan(
1048        &self,
1049        coalesce_batches: &protobuf::CoalesceBatchesExecNode,
1050        ctx: &PhysicalPlanDecodeContext<'_>,
1051        proto_converter: &dyn PhysicalProtoConverterExtension,
1052    ) -> Result<Arc<dyn ExecutionPlan>> {
1053        let input: Arc<dyn ExecutionPlan> =
1054            into_physical_plan(&coalesce_batches.input, ctx, proto_converter)?;
1055        Ok(Arc::new(
1056            #[expect(deprecated)]
1057            CoalesceBatchesExec::new(input, coalesce_batches.target_batch_size as usize)
1058                .with_fetch(coalesce_batches.fetch.map(|f| f as usize)),
1059        ))
1060    }
1061
1062    fn try_into_merge_physical_plan(
1063        &self,
1064        merge: &protobuf::CoalescePartitionsExecNode,
1065        ctx: &PhysicalPlanDecodeContext<'_>,
1066        proto_converter: &dyn PhysicalProtoConverterExtension,
1067    ) -> Result<Arc<dyn ExecutionPlan>> {
1068        let input: Arc<dyn ExecutionPlan> =
1069            into_physical_plan(&merge.input, ctx, proto_converter)?;
1070        Ok(Arc::new(
1071            CoalescePartitionsExec::new(input)
1072                .with_fetch(merge.fetch.map(|f| f as usize)),
1073        ))
1074    }
1075
1076    fn try_into_repartition_physical_plan(
1077        &self,
1078        repart: &protobuf::RepartitionExecNode,
1079        ctx: &PhysicalPlanDecodeContext<'_>,
1080        proto_converter: &dyn PhysicalProtoConverterExtension,
1081    ) -> Result<Arc<dyn ExecutionPlan>> {
1082        let input: Arc<dyn ExecutionPlan> =
1083            into_physical_plan(&repart.input, ctx, proto_converter)?;
1084        let partitioning = parse_protobuf_partitioning(
1085            repart.partitioning.as_ref(),
1086            ctx,
1087            input.schema().as_ref(),
1088            proto_converter,
1089        )?;
1090        let mut repart_exec = RepartitionExec::try_new(input, partitioning.unwrap())?;
1091        if repart.preserve_order {
1092            repart_exec = repart_exec.with_preserve_order();
1093        }
1094        Ok(Arc::new(repart_exec))
1095    }
1096
1097    fn try_into_global_limit_physical_plan(
1098        &self,
1099        limit: &protobuf::GlobalLimitExecNode,
1100        ctx: &PhysicalPlanDecodeContext<'_>,
1101        proto_converter: &dyn PhysicalProtoConverterExtension,
1102    ) -> Result<Arc<dyn ExecutionPlan>> {
1103        let input: Arc<dyn ExecutionPlan> =
1104            into_physical_plan(&limit.input, ctx, proto_converter)?;
1105        let fetch = if limit.fetch >= 0 {
1106            Some(limit.fetch as usize)
1107        } else {
1108            None
1109        };
1110        Ok(Arc::new(GlobalLimitExec::new(
1111            input,
1112            limit.skip as usize,
1113            fetch,
1114        )))
1115    }
1116
1117    fn try_into_local_limit_physical_plan(
1118        &self,
1119        limit: &protobuf::LocalLimitExecNode,
1120        ctx: &PhysicalPlanDecodeContext<'_>,
1121        proto_converter: &dyn PhysicalProtoConverterExtension,
1122    ) -> Result<Arc<dyn ExecutionPlan>> {
1123        let input: Arc<dyn ExecutionPlan> =
1124            into_physical_plan(&limit.input, ctx, proto_converter)?;
1125        Ok(Arc::new(LocalLimitExec::new(input, limit.fetch as usize)))
1126    }
1127
1128    fn try_into_window_physical_plan(
1129        &self,
1130        window_agg: &protobuf::WindowAggExecNode,
1131        ctx: &PhysicalPlanDecodeContext<'_>,
1132        proto_converter: &dyn PhysicalProtoConverterExtension,
1133    ) -> Result<Arc<dyn ExecutionPlan>> {
1134        let input: Arc<dyn ExecutionPlan> =
1135            into_physical_plan(&window_agg.input, ctx, proto_converter)?;
1136        let input_schema = input.schema();
1137
1138        let physical_window_expr: Vec<Arc<dyn WindowExpr>> = window_agg
1139            .window_expr
1140            .iter()
1141            .map(|window_expr| {
1142                parse_physical_window_expr(
1143                    window_expr,
1144                    ctx,
1145                    input_schema.as_ref(),
1146                    proto_converter,
1147                )
1148            })
1149            .collect::<Result<Vec<_>, _>>()?;
1150
1151        let partition_keys = window_agg
1152            .partition_keys
1153            .iter()
1154            .map(|expr| {
1155                proto_converter.proto_to_physical_expr(expr, input.schema().as_ref(), ctx)
1156            })
1157            .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
1158
1159        if let Some(input_order_mode) = window_agg.input_order_mode.as_ref() {
1160            let input_order_mode = match input_order_mode {
1161                window_agg_exec_node::InputOrderMode::Linear(_) => InputOrderMode::Linear,
1162                window_agg_exec_node::InputOrderMode::PartiallySorted(
1163                    protobuf::PartiallySortedInputOrderMode { columns },
1164                ) => InputOrderMode::PartiallySorted(
1165                    columns.iter().map(|c| *c as usize).collect(),
1166                ),
1167                window_agg_exec_node::InputOrderMode::Sorted(_) => InputOrderMode::Sorted,
1168            };
1169
1170            Ok(Arc::new(BoundedWindowAggExec::try_new(
1171                physical_window_expr,
1172                input,
1173                input_order_mode,
1174                !partition_keys.is_empty(),
1175            )?))
1176        } else {
1177            Ok(Arc::new(WindowAggExec::try_new(
1178                physical_window_expr,
1179                input,
1180                !partition_keys.is_empty(),
1181            )?))
1182        }
1183    }
1184
1185    fn try_into_aggregate_physical_plan(
1186        &self,
1187        hash_agg: &protobuf::AggregateExecNode,
1188        ctx: &PhysicalPlanDecodeContext<'_>,
1189        proto_converter: &dyn PhysicalProtoConverterExtension,
1190    ) -> Result<Arc<dyn ExecutionPlan>> {
1191        let input: Arc<dyn ExecutionPlan> =
1192            into_physical_plan(&hash_agg.input, ctx, proto_converter)?;
1193        let mode = protobuf::AggregateMode::try_from(hash_agg.mode).map_err(|_| {
1194            proto_error(format!(
1195                "Received a AggregateNode message with unknown AggregateMode {}",
1196                hash_agg.mode
1197            ))
1198        })?;
1199        let agg_mode: AggregateMode = match mode {
1200            protobuf::AggregateMode::Partial => AggregateMode::Partial,
1201            protobuf::AggregateMode::Final => AggregateMode::Final,
1202            protobuf::AggregateMode::FinalPartitioned => AggregateMode::FinalPartitioned,
1203            protobuf::AggregateMode::Single => AggregateMode::Single,
1204            protobuf::AggregateMode::SinglePartitioned => {
1205                AggregateMode::SinglePartitioned
1206            }
1207            protobuf::AggregateMode::PartialReduce => AggregateMode::PartialReduce,
1208        };
1209
1210        let num_expr = hash_agg.group_expr.len();
1211
1212        let group_expr = hash_agg
1213            .group_expr
1214            .iter()
1215            .zip(hash_agg.group_expr_name.iter())
1216            .map(|(expr, name)| {
1217                proto_converter
1218                    .proto_to_physical_expr(expr, input.schema().as_ref(), ctx)
1219                    .map(|expr| (expr, name.to_string()))
1220            })
1221            .collect::<Result<Vec<_>, _>>()?;
1222
1223        let null_expr = hash_agg
1224            .null_expr
1225            .iter()
1226            .zip(hash_agg.group_expr_name.iter())
1227            .map(|(expr, name)| {
1228                proto_converter
1229                    .proto_to_physical_expr(expr, input.schema().as_ref(), ctx)
1230                    .map(|expr| (expr, name.to_string()))
1231            })
1232            .collect::<Result<Vec<_>, _>>()?;
1233
1234        let groups: Vec<Vec<bool>> = if !hash_agg.groups.is_empty() {
1235            hash_agg
1236                .groups
1237                .chunks(num_expr)
1238                .map(|g| g.to_vec())
1239                .collect::<Vec<Vec<bool>>>()
1240        } else {
1241            vec![]
1242        };
1243
1244        let has_grouping_set = hash_agg.has_grouping_set;
1245
1246        let input_schema = hash_agg.input_schema.as_ref().ok_or_else(|| {
1247            internal_datafusion_err!("input_schema in AggregateNode is missing.")
1248        })?;
1249        let physical_schema: SchemaRef = SchemaRef::new(input_schema.try_into()?);
1250
1251        let physical_filter_expr = hash_agg
1252            .filter_expr
1253            .iter()
1254            .map(|expr| {
1255                expr.expr
1256                    .as_ref()
1257                    .map(|e| {
1258                        proto_converter.proto_to_physical_expr(e, &physical_schema, ctx)
1259                    })
1260                    .transpose()
1261            })
1262            .collect::<Result<Vec<_>, _>>()?;
1263
1264        let physical_aggr_expr: Vec<Arc<AggregateFunctionExpr>> = hash_agg
1265            .aggr_expr
1266            .iter()
1267            .zip(hash_agg.aggr_expr_name.iter())
1268            .map(|(expr, name)| {
1269                let expr_type = expr.expr_type.as_ref().ok_or_else(|| {
1270                    proto_error("Unexpected empty aggregate physical expression")
1271                })?;
1272
1273                match expr_type {
1274                    ExprType::AggregateExpr(agg_node) => {
1275                        let input_phy_expr: Vec<Arc<dyn PhysicalExpr>> = agg_node
1276                            .expr
1277                            .iter()
1278                            .map(|e| {
1279                                proto_converter.proto_to_physical_expr(
1280                                    e,
1281                                    &physical_schema,
1282                                    ctx,
1283                                )
1284                            })
1285                            .collect::<Result<Vec<_>>>()?;
1286                        let order_bys = agg_node
1287                            .ordering_req
1288                            .iter()
1289                            .map(|e| {
1290                                parse_physical_sort_expr(
1291                                    e,
1292                                    ctx,
1293                                    &physical_schema,
1294                                    proto_converter,
1295                                )
1296                            })
1297                            .collect::<Result<_>>()?;
1298                        agg_node
1299                            .aggregate_function
1300                            .as_ref()
1301                            .map(|func| match func {
1302                                AggregateFunction::UserDefinedAggrFunction(udaf_name) => {
1303                                    let agg_udf = match &agg_node.fun_definition {
1304                                        Some(buf) => {
1305                                            ctx.codec().try_decode_udaf(udaf_name, buf)?
1306                                        }
1307                                        None => ctx.task_ctx().udaf(udaf_name).or_else(
1308                                            |_| {
1309                                                ctx.codec()
1310                                                    .try_decode_udaf(udaf_name, &[])
1311                                            },
1312                                        )?,
1313                                    };
1314
1315                                    let (human_display, human_display_alias) =
1316                                        split_human_display_alias(
1317                                            &agg_node.human_display,
1318                                            name,
1319                                        );
1320                                    let builder = AggregateExprBuilder::new(
1321                                        agg_udf,
1322                                        input_phy_expr,
1323                                    )
1324                                    .schema(Arc::clone(&physical_schema))
1325                                    .alias(name)
1326                                    .with_ignore_nulls(agg_node.ignore_nulls)
1327                                    .with_distinct(agg_node.distinct)
1328                                    .order_by(order_bys)
1329                                    .human_display(human_display);
1330                                    let builder = if let Some(alias) = human_display_alias
1331                                    {
1332                                        builder.human_display_alias(alias)
1333                                    } else {
1334                                        builder
1335                                    };
1336                                    builder.build().map(Arc::new)
1337                                }
1338                            })
1339                            .transpose()?
1340                            .ok_or_else(|| {
1341                                proto_error(
1342                                    "Invalid AggregateExpr, missing aggregate_function",
1343                                )
1344                            })
1345                    }
1346                    _ => internal_err!("Invalid aggregate expression for AggregateExec"),
1347                }
1348            })
1349            .collect::<Result<Vec<_>, _>>()?;
1350
1351        let physical_schema_ref = Arc::clone(&physical_schema);
1352        let agg = AggregateExec::try_new(
1353            agg_mode,
1354            PhysicalGroupBy::new(group_expr, null_expr, groups, has_grouping_set),
1355            physical_aggr_expr,
1356            physical_filter_expr,
1357            input,
1358            physical_schema,
1359        )?;
1360
1361        let agg = if let Some(limit_proto) = &hash_agg.limit {
1362            let limit = limit_proto.limit as usize;
1363            let limit_options = match limit_proto.descending {
1364                Some(descending) => LimitOptions::new_with_order(limit, descending),
1365                None => LimitOptions::new(limit),
1366            };
1367            agg.with_limit_options(Some(limit_options))
1368        } else {
1369            agg
1370        };
1371
1372        let agg = if let Some(dynamic_filter_proto) = &hash_agg.dynamic_filter {
1373            let dynamic_filter_expr = proto_converter.proto_to_physical_expr(
1374                dynamic_filter_proto,
1375                physical_schema_ref.as_ref(),
1376                ctx,
1377            )?;
1378            let df = (dynamic_filter_expr as Arc<dyn Any + Send + Sync>)
1379                .downcast::<DynamicFilterPhysicalExpr>()
1380                .map_err(|_| {
1381                    internal_datafusion_err!(
1382                        "AggregateExec dynamic_filter did not decode to a DynamicFilterPhysicalExpr"
1383                    )
1384                })?;
1385            agg.with_dynamic_filter_expr(df)?
1386        } else {
1387            agg
1388        };
1389
1390        Ok(Arc::new(agg))
1391    }
1392
1393    fn try_into_hash_join_physical_plan(
1394        &self,
1395        hashjoin: &protobuf::HashJoinExecNode,
1396        ctx: &PhysicalPlanDecodeContext<'_>,
1397        proto_converter: &dyn PhysicalProtoConverterExtension,
1398    ) -> Result<Arc<dyn ExecutionPlan>> {
1399        let left: Arc<dyn ExecutionPlan> =
1400            into_physical_plan(&hashjoin.left, ctx, proto_converter)?;
1401        let right: Arc<dyn ExecutionPlan> =
1402            into_physical_plan(&hashjoin.right, ctx, proto_converter)?;
1403        let left_schema = left.schema();
1404        let right_schema = right.schema();
1405        let on: Vec<(PhysicalExprRef, PhysicalExprRef)> = hashjoin
1406            .on
1407            .iter()
1408            .map(|col| {
1409                let left = proto_converter.proto_to_physical_expr(
1410                    &col.left.clone().unwrap(),
1411                    left_schema.as_ref(),
1412                    ctx,
1413                )?;
1414                let right = proto_converter.proto_to_physical_expr(
1415                    &col.right.clone().unwrap(),
1416                    right_schema.as_ref(),
1417                    ctx,
1418                )?;
1419                Ok((left, right))
1420            })
1421            .collect::<Result<_>>()?;
1422        let join_type =
1423            protobuf::JoinType::try_from(hashjoin.join_type).map_err(|_| {
1424                proto_error(format!(
1425                    "Received a HashJoinNode message with unknown JoinType {}",
1426                    hashjoin.join_type
1427                ))
1428            })?;
1429        let null_equality = protobuf::NullEquality::try_from(hashjoin.null_equality)
1430            .map_err(|_| {
1431                proto_error(format!(
1432                    "Received a HashJoinNode message with unknown NullEquality {}",
1433                    hashjoin.null_equality
1434                ))
1435            })?;
1436        let filter = hashjoin
1437            .filter
1438            .as_ref()
1439            .map(|f| {
1440                let schema = f
1441                    .schema
1442                    .as_ref()
1443                    .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1444                    .try_into()?;
1445
1446                let expression = proto_converter.proto_to_physical_expr(
1447                    f.expression.as_ref().ok_or_else(|| {
1448                        proto_error("Unexpected empty filter expression")
1449                    })?,
1450                    &schema,
1451                    ctx,
1452                )?;
1453                let column_indices = f.column_indices
1454                    .iter()
1455                    .map(|i| {
1456                        let side = protobuf::JoinSide::try_from(i.side)
1457                            .map_err(|_| proto_error(format!(
1458                                "Received a HashJoinNode message with JoinSide in Filter {}",
1459                                i.side))
1460                            )?;
1461
1462                        Ok(ColumnIndex {
1463                            index: i.index as usize,
1464                            side: side.into(),
1465                        })
1466                    })
1467                    .collect::<Result<Vec<_>>>()?;
1468
1469                Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1470            })
1471            .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1472
1473        let partition_mode = protobuf::PartitionMode::try_from(hashjoin.partition_mode)
1474            .map_err(|_| {
1475            proto_error(format!(
1476                "Received a HashJoinNode message with unknown PartitionMode {}",
1477                hashjoin.partition_mode
1478            ))
1479        })?;
1480        let partition_mode = match partition_mode {
1481            protobuf::PartitionMode::CollectLeft => PartitionMode::CollectLeft,
1482            protobuf::PartitionMode::Partitioned => PartitionMode::Partitioned,
1483            protobuf::PartitionMode::Auto => PartitionMode::Auto,
1484        };
1485        let projection = if !hashjoin.projection.is_empty() {
1486            Some(
1487                hashjoin
1488                    .projection
1489                    .iter()
1490                    .map(|i| *i as usize)
1491                    .collect::<Vec<_>>(),
1492            )
1493        } else {
1494            None
1495        };
1496        let mut hash_join = HashJoinExec::try_new(
1497            left,
1498            right,
1499            on,
1500            filter,
1501            &join_type.into(),
1502            projection,
1503            partition_mode,
1504            null_equality.into(),
1505            hashjoin.null_aware,
1506        )?;
1507
1508        if let Some(dynamic_filter_proto) = &hashjoin.dynamic_filter {
1509            let dynamic_filter_expr = proto_converter.proto_to_physical_expr(
1510                dynamic_filter_proto,
1511                right_schema.as_ref(),
1512                ctx,
1513            )?;
1514            let df = (dynamic_filter_expr as Arc<dyn Any + Send + Sync>)
1515                .downcast::<DynamicFilterPhysicalExpr>()
1516                .map_err(|_| {
1517                    internal_datafusion_err!(
1518                        "HashJoinExec dynamic_filter did not decode to a DynamicFilterPhysicalExpr"
1519                    )
1520                })?;
1521            hash_join = hash_join.with_dynamic_filter_expr(df)?;
1522        }
1523
1524        Ok(Arc::new(hash_join))
1525    }
1526
1527    fn try_into_symmetric_hash_join_physical_plan(
1528        &self,
1529        sym_join: &protobuf::SymmetricHashJoinExecNode,
1530        ctx: &PhysicalPlanDecodeContext<'_>,
1531        proto_converter: &dyn PhysicalProtoConverterExtension,
1532    ) -> Result<Arc<dyn ExecutionPlan>> {
1533        let left = into_physical_plan(&sym_join.left, ctx, proto_converter)?;
1534        let right = into_physical_plan(&sym_join.right, ctx, proto_converter)?;
1535        let left_schema = left.schema();
1536        let right_schema = right.schema();
1537        let on = sym_join
1538            .on
1539            .iter()
1540            .map(|col| {
1541                let left = proto_converter.proto_to_physical_expr(
1542                    &col.left.clone().unwrap(),
1543                    left_schema.as_ref(),
1544                    ctx,
1545                )?;
1546                let right = proto_converter.proto_to_physical_expr(
1547                    &col.right.clone().unwrap(),
1548                    right_schema.as_ref(),
1549                    ctx,
1550                )?;
1551                Ok((left, right))
1552            })
1553            .collect::<Result<_>>()?;
1554        let join_type =
1555            protobuf::JoinType::try_from(sym_join.join_type).map_err(|_| {
1556                proto_error(format!(
1557                    "Received a SymmetricHashJoin message with unknown JoinType {}",
1558                    sym_join.join_type
1559                ))
1560            })?;
1561        let null_equality = protobuf::NullEquality::try_from(sym_join.null_equality)
1562            .map_err(|_| {
1563                proto_error(format!(
1564                    "Received a SymmetricHashJoin message with unknown NullEquality {}",
1565                    sym_join.null_equality
1566                ))
1567            })?;
1568        let filter = sym_join
1569            .filter
1570            .as_ref()
1571            .map(|f| {
1572                let schema = f
1573                    .schema
1574                    .as_ref()
1575                    .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1576                    .try_into()?;
1577
1578                let expression = proto_converter.proto_to_physical_expr(
1579                    f.expression.as_ref().ok_or_else(|| {
1580                        proto_error("Unexpected empty filter expression")
1581                    })?,
1582                    &schema,
1583                    ctx,
1584                )?;
1585                let column_indices = f.column_indices
1586                    .iter()
1587                    .map(|i| {
1588                        let side = protobuf::JoinSide::try_from(i.side)
1589                            .map_err(|_| proto_error(format!(
1590                                "Received a HashJoinNode message with JoinSide in Filter {}",
1591                                i.side))
1592                            )?;
1593
1594                        Ok(ColumnIndex {
1595                            index: i.index as usize,
1596                            side: side.into(),
1597                        })
1598                    })
1599                    .collect::<Result<_>>()?;
1600
1601                Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1602            })
1603            .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1604
1605        let left_sort_exprs = parse_physical_sort_exprs(
1606            &sym_join.left_sort_exprs,
1607            ctx,
1608            &left_schema,
1609            proto_converter,
1610        )?;
1611        let left_sort_exprs = LexOrdering::new(left_sort_exprs);
1612
1613        let right_sort_exprs = parse_physical_sort_exprs(
1614            &sym_join.right_sort_exprs,
1615            ctx,
1616            &right_schema,
1617            proto_converter,
1618        )?;
1619        let right_sort_exprs = LexOrdering::new(right_sort_exprs);
1620
1621        let partition_mode = protobuf::StreamPartitionMode::try_from(
1622            sym_join.partition_mode,
1623        )
1624        .map_err(|_| {
1625            proto_error(format!(
1626                "Received a SymmetricHashJoin message with unknown PartitionMode {}",
1627                sym_join.partition_mode
1628            ))
1629        })?;
1630        let partition_mode = match partition_mode {
1631            protobuf::StreamPartitionMode::SinglePartition => {
1632                StreamJoinPartitionMode::SinglePartition
1633            }
1634            protobuf::StreamPartitionMode::PartitionedExec => {
1635                StreamJoinPartitionMode::Partitioned
1636            }
1637        };
1638        SymmetricHashJoinExec::try_new(
1639            left,
1640            right,
1641            on,
1642            filter,
1643            &join_type.into(),
1644            null_equality.into(),
1645            left_sort_exprs,
1646            right_sort_exprs,
1647            partition_mode,
1648        )
1649        .map(|e| Arc::new(e) as _)
1650    }
1651
1652    fn try_into_union_physical_plan(
1653        &self,
1654        union: &protobuf::UnionExecNode,
1655        ctx: &PhysicalPlanDecodeContext<'_>,
1656        proto_converter: &dyn PhysicalProtoConverterExtension,
1657    ) -> Result<Arc<dyn ExecutionPlan>> {
1658        let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1659        for input in &union.inputs {
1660            inputs.push(proto_converter.proto_to_execution_plan(input, ctx)?);
1661        }
1662        UnionExec::try_new(inputs)
1663    }
1664
1665    fn try_into_interleave_physical_plan(
1666        &self,
1667        interleave: &protobuf::InterleaveExecNode,
1668        ctx: &PhysicalPlanDecodeContext<'_>,
1669        proto_converter: &dyn PhysicalProtoConverterExtension,
1670    ) -> Result<Arc<dyn ExecutionPlan>> {
1671        let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1672        for input in &interleave.inputs {
1673            inputs.push(proto_converter.proto_to_execution_plan(input, ctx)?);
1674        }
1675        Ok(Arc::new(InterleaveExec::try_new(inputs)?))
1676    }
1677
1678    fn try_into_cross_join_physical_plan(
1679        &self,
1680        crossjoin: &protobuf::CrossJoinExecNode,
1681        ctx: &PhysicalPlanDecodeContext<'_>,
1682        proto_converter: &dyn PhysicalProtoConverterExtension,
1683    ) -> Result<Arc<dyn ExecutionPlan>> {
1684        let left: Arc<dyn ExecutionPlan> =
1685            into_physical_plan(&crossjoin.left, ctx, proto_converter)?;
1686        let right: Arc<dyn ExecutionPlan> =
1687            into_physical_plan(&crossjoin.right, ctx, proto_converter)?;
1688        Ok(Arc::new(CrossJoinExec::new(left, right)))
1689    }
1690
1691    fn try_into_empty_physical_plan(
1692        &self,
1693        empty: &protobuf::EmptyExecNode,
1694        _ctx: &PhysicalPlanDecodeContext<'_>,
1695        _proto_converter: &dyn PhysicalProtoConverterExtension,
1696    ) -> Result<Arc<dyn ExecutionPlan>> {
1697        let schema = Arc::new(convert_required!(empty.schema)?);
1698        Ok(Arc::new(EmptyExec::new(schema)))
1699    }
1700
1701    fn try_into_placeholder_row_physical_plan(
1702        &self,
1703        placeholder: &protobuf::PlaceholderRowExecNode,
1704        _ctx: &PhysicalPlanDecodeContext<'_>,
1705    ) -> Result<Arc<dyn ExecutionPlan>> {
1706        let schema = Arc::new(convert_required!(placeholder.schema)?);
1707        Ok(Arc::new(PlaceholderRowExec::new(schema)))
1708    }
1709
1710    fn try_into_sort_physical_plan(
1711        &self,
1712        sort: &protobuf::SortExecNode,
1713        ctx: &PhysicalPlanDecodeContext<'_>,
1714        proto_converter: &dyn PhysicalProtoConverterExtension,
1715    ) -> Result<Arc<dyn ExecutionPlan>> {
1716        let input = into_physical_plan(&sort.input, ctx, proto_converter)?;
1717        let exprs = sort
1718            .expr
1719            .iter()
1720            .map(|expr| {
1721                let expr = expr.expr_type.as_ref().ok_or_else(|| {
1722                    proto_error(format!(
1723                        "physical_plan::from_proto() Unexpected expr {self:?}"
1724                    ))
1725                })?;
1726                if let ExprType::Sort(sort_expr) = expr {
1727                    let expr = sort_expr
1728                        .expr
1729                        .as_ref()
1730                        .ok_or_else(|| {
1731                            proto_error(format!(
1732                                "physical_plan::from_proto() Unexpected sort expr {self:?}"
1733                            ))
1734                        })?
1735                        .as_ref();
1736                    Ok(PhysicalSortExpr {
1737                        expr: proto_converter.proto_to_physical_expr(
1738                            expr,
1739                            input.schema().as_ref(),
1740                            ctx,
1741                        )?,
1742                        options: SortOptions {
1743                            descending: !sort_expr.asc,
1744                            nulls_first: sort_expr.nulls_first,
1745                        },
1746                    })
1747                } else {
1748                    internal_err!(
1749                        "physical_plan::from_proto() {self:?}"
1750                    )
1751                }
1752            })
1753            .collect::<Result<Vec<_>>>()?;
1754        let Some(ordering) = LexOrdering::new(exprs) else {
1755            return internal_err!("SortExec requires an ordering");
1756        };
1757        let fetch = (sort.fetch >= 0).then_some(sort.fetch as _);
1758        let new_sort = SortExec::new(ordering, input)
1759            .with_fetch(fetch)
1760            .with_preserve_partitioning(sort.preserve_partitioning);
1761
1762        let new_sort = if let Some(dynamic_filter_proto) = &sort.dynamic_filter {
1763            let dynamic_filter_expr = proto_converter.proto_to_physical_expr(
1764                dynamic_filter_proto,
1765                new_sort.input().schema().as_ref(),
1766                ctx,
1767            )?;
1768            let df = (dynamic_filter_expr as Arc<dyn Any + Send + Sync>)
1769                .downcast::<DynamicFilterPhysicalExpr>()
1770                .map_err(|_| {
1771                    internal_datafusion_err!(
1772                        "SortExec dynamic_filter did not decode to a DynamicFilterPhysicalExpr"
1773                    )
1774                })?;
1775            new_sort.with_dynamic_filter_expr(df)?
1776        } else {
1777            new_sort
1778        };
1779
1780        Ok(Arc::new(new_sort))
1781    }
1782
1783    fn try_into_sort_preserving_merge_physical_plan(
1784        &self,
1785        sort: &protobuf::SortPreservingMergeExecNode,
1786        ctx: &PhysicalPlanDecodeContext<'_>,
1787        proto_converter: &dyn PhysicalProtoConverterExtension,
1788    ) -> Result<Arc<dyn ExecutionPlan>> {
1789        let input = into_physical_plan(&sort.input, ctx, proto_converter)?;
1790        let exprs = sort
1791            .expr
1792            .iter()
1793            .map(|expr| {
1794                let expr = expr.expr_type.as_ref().ok_or_else(|| {
1795                    proto_error(format!(
1796                        "physical_plan::from_proto() Unexpected expr {self:?}"
1797                    ))
1798                })?;
1799                if let ExprType::Sort(sort_expr) = expr {
1800                    let expr = sort_expr
1801                        .expr
1802                        .as_ref()
1803                        .ok_or_else(|| {
1804                            proto_error(format!(
1805                            "physical_plan::from_proto() Unexpected sort expr {self:?}"
1806                        ))
1807                        })?
1808                        .as_ref();
1809                    Ok(PhysicalSortExpr {
1810                        expr: proto_converter.proto_to_physical_expr(
1811                            expr,
1812                            input.schema().as_ref(),
1813                            ctx,
1814                        )?,
1815                        options: SortOptions {
1816                            descending: !sort_expr.asc,
1817                            nulls_first: sort_expr.nulls_first,
1818                        },
1819                    })
1820                } else {
1821                    internal_err!("physical_plan::from_proto() {self:?}")
1822                }
1823            })
1824            .collect::<Result<Vec<_>>>()?;
1825        let Some(ordering) = LexOrdering::new(exprs) else {
1826            return internal_err!("SortExec requires an ordering");
1827        };
1828        let fetch = (sort.fetch >= 0).then_some(sort.fetch as _);
1829        Ok(Arc::new(
1830            SortPreservingMergeExec::new(ordering, input).with_fetch(fetch),
1831        ))
1832    }
1833
1834    fn try_into_extension_physical_plan(
1835        &self,
1836        extension: &protobuf::PhysicalExtensionNode,
1837        ctx: &PhysicalPlanDecodeContext<'_>,
1838        proto_converter: &dyn PhysicalProtoConverterExtension,
1839    ) -> Result<Arc<dyn ExecutionPlan>> {
1840        let inputs: Vec<Arc<dyn ExecutionPlan>> = extension
1841            .inputs
1842            .iter()
1843            .map(|i| proto_converter.proto_to_execution_plan(i, ctx))
1844            .collect::<Result<_>>()?;
1845
1846        let extension_node =
1847            ctx.codec()
1848                .try_decode(extension.node.as_slice(), &inputs, ctx.task_ctx())?;
1849
1850        Ok(extension_node)
1851    }
1852
1853    fn try_into_nested_loop_join_physical_plan(
1854        &self,
1855        join: &protobuf::NestedLoopJoinExecNode,
1856        ctx: &PhysicalPlanDecodeContext<'_>,
1857        proto_converter: &dyn PhysicalProtoConverterExtension,
1858    ) -> Result<Arc<dyn ExecutionPlan>> {
1859        let left: Arc<dyn ExecutionPlan> =
1860            into_physical_plan(&join.left, ctx, proto_converter)?;
1861        let right: Arc<dyn ExecutionPlan> =
1862            into_physical_plan(&join.right, ctx, proto_converter)?;
1863        let join_type = protobuf::JoinType::try_from(join.join_type).map_err(|_| {
1864            proto_error(format!(
1865                "Received a NestedLoopJoinExecNode message with unknown JoinType {}",
1866                join.join_type
1867            ))
1868        })?;
1869        let filter = join
1870                    .filter
1871                    .as_ref()
1872                    .map(|f| {
1873                        let schema = f
1874                            .schema
1875                            .as_ref()
1876                            .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1877                            .try_into()?;
1878
1879                        let expression = proto_converter
1880                            .proto_to_physical_expr(
1881                            f.expression.as_ref().ok_or_else(|| {
1882                                proto_error("Unexpected empty filter expression")
1883                            })?,
1884                            &schema,
1885                            ctx,
1886                        )?;
1887                        let column_indices = f.column_indices
1888                            .iter()
1889                            .map(|i| {
1890                                let side = protobuf::JoinSide::try_from(i.side)
1891                                    .map_err(|_| proto_error(format!(
1892                                        "Received a NestedLoopJoinExecNode message with JoinSide in Filter {}",
1893                                        i.side))
1894                                    )?;
1895
1896                                Ok(ColumnIndex {
1897                                    index: i.index as usize,
1898                                    side: side.into(),
1899                                })
1900                            })
1901                            .collect::<Result<Vec<_>>>()?;
1902
1903                        Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1904                    })
1905                    .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1906
1907        let projection = if !join.projection.is_empty() {
1908            Some(
1909                join.projection
1910                    .iter()
1911                    .map(|i| *i as usize)
1912                    .collect::<Vec<_>>(),
1913            )
1914        } else {
1915            None
1916        };
1917
1918        Ok(Arc::new(NestedLoopJoinExec::try_new(
1919            left,
1920            right,
1921            filter,
1922            &join_type.into(),
1923            projection,
1924        )?))
1925    }
1926
1927    fn try_into_analyze_physical_plan(
1928        &self,
1929        analyze: &protobuf::AnalyzeExecNode,
1930        ctx: &PhysicalPlanDecodeContext<'_>,
1931        proto_converter: &dyn PhysicalProtoConverterExtension,
1932    ) -> Result<Arc<dyn ExecutionPlan>> {
1933        let input: Arc<dyn ExecutionPlan> =
1934            into_physical_plan(&analyze.input, ctx, proto_converter)?;
1935        let metric_categories = if analyze.has_metric_categories {
1936            let cats: Result<Vec<MetricCategory>> = analyze
1937                .metric_categories
1938                .iter()
1939                .map(|s| s.parse::<MetricCategory>())
1940                .collect();
1941            Some(cats?)
1942        } else {
1943            None
1944        };
1945        Ok(Arc::new(AnalyzeExec::new(
1946            analyze.verbose,
1947            analyze.show_statistics,
1948            vec![MetricType::Summary, MetricType::Dev],
1949            metric_categories,
1950            input,
1951            Arc::new(convert_required!(analyze.schema)?),
1952        )))
1953    }
1954
1955    fn try_into_json_sink_physical_plan(
1956        &self,
1957        sink: &protobuf::JsonSinkExecNode,
1958        ctx: &PhysicalPlanDecodeContext<'_>,
1959        proto_converter: &dyn PhysicalProtoConverterExtension,
1960    ) -> Result<Arc<dyn ExecutionPlan>> {
1961        let input = into_physical_plan(&sink.input, ctx, proto_converter)?;
1962
1963        let data_sink: JsonSink = sink
1964            .sink
1965            .as_ref()
1966            .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1967            .try_into()?;
1968        let sink_schema = input.schema();
1969        let sort_order = sink
1970            .sort_order
1971            .as_ref()
1972            .map(|collection| {
1973                parse_physical_sort_exprs(
1974                    &collection.physical_sort_expr_nodes,
1975                    ctx,
1976                    &sink_schema,
1977                    proto_converter,
1978                )
1979                .map(|sort_exprs| {
1980                    LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1981                })
1982            })
1983            .transpose()?
1984            .flatten();
1985        Ok(Arc::new(DataSinkExec::new(
1986            input,
1987            Arc::new(data_sink),
1988            sort_order,
1989        )))
1990    }
1991
1992    fn try_into_csv_sink_physical_plan(
1993        &self,
1994        sink: &protobuf::CsvSinkExecNode,
1995        ctx: &PhysicalPlanDecodeContext<'_>,
1996        proto_converter: &dyn PhysicalProtoConverterExtension,
1997    ) -> Result<Arc<dyn ExecutionPlan>> {
1998        let input = into_physical_plan(&sink.input, ctx, proto_converter)?;
1999
2000        let data_sink: CsvSink = sink
2001            .sink
2002            .as_ref()
2003            .ok_or_else(|| proto_error("Missing required field in protobuf"))?
2004            .try_into()?;
2005        let sink_schema = input.schema();
2006        let sort_order = sink
2007            .sort_order
2008            .as_ref()
2009            .map(|collection| {
2010                parse_physical_sort_exprs(
2011                    &collection.physical_sort_expr_nodes,
2012                    ctx,
2013                    &sink_schema,
2014                    proto_converter,
2015                )
2016                .map(|sort_exprs| {
2017                    LexRequirement::new(sort_exprs.into_iter().map(Into::into))
2018                })
2019            })
2020            .transpose()?
2021            .flatten();
2022        Ok(Arc::new(DataSinkExec::new(
2023            input,
2024            Arc::new(data_sink),
2025            sort_order,
2026        )))
2027    }
2028
2029    #[cfg_attr(not(feature = "parquet"), expect(unused_variables))]
2030    fn try_into_parquet_sink_physical_plan(
2031        &self,
2032        sink: &protobuf::ParquetSinkExecNode,
2033        ctx: &PhysicalPlanDecodeContext<'_>,
2034        proto_converter: &dyn PhysicalProtoConverterExtension,
2035    ) -> Result<Arc<dyn ExecutionPlan>> {
2036        #[cfg(feature = "parquet")]
2037        {
2038            let input = into_physical_plan(&sink.input, ctx, proto_converter)?;
2039
2040            let data_sink: ParquetSink = sink
2041                .sink
2042                .as_ref()
2043                .ok_or_else(|| proto_error("Missing required field in protobuf"))?
2044                .try_into()?;
2045            let sink_schema = input.schema();
2046            let sort_order = sink
2047                .sort_order
2048                .as_ref()
2049                .map(|collection| {
2050                    parse_physical_sort_exprs(
2051                        &collection.physical_sort_expr_nodes,
2052                        ctx,
2053                        &sink_schema,
2054                        proto_converter,
2055                    )
2056                    .map(|sort_exprs| {
2057                        LexRequirement::new(sort_exprs.into_iter().map(Into::into))
2058                    })
2059                })
2060                .transpose()?
2061                .flatten();
2062            Ok(Arc::new(DataSinkExec::new(
2063                input,
2064                Arc::new(data_sink),
2065                sort_order,
2066            )))
2067        }
2068        #[cfg(not(feature = "parquet"))]
2069        panic!("Trying to use ParquetSink without `parquet` feature enabled");
2070    }
2071
2072    fn try_into_unnest_physical_plan(
2073        &self,
2074        unnest: &protobuf::UnnestExecNode,
2075        ctx: &PhysicalPlanDecodeContext<'_>,
2076        proto_converter: &dyn PhysicalProtoConverterExtension,
2077    ) -> Result<Arc<dyn ExecutionPlan>> {
2078        let input = into_physical_plan(&unnest.input, ctx, proto_converter)?;
2079
2080        Ok(Arc::new(UnnestExec::new(
2081            input,
2082            unnest
2083                .list_type_columns
2084                .iter()
2085                .map(|c| ListUnnest {
2086                    index_in_input_schema: c.index_in_input_schema as _,
2087                    depth: c.depth as _,
2088                })
2089                .collect(),
2090            unnest.struct_type_columns.iter().map(|c| *c as _).collect(),
2091            Arc::new(convert_required!(unnest.schema)?),
2092            into_required!(unnest.options)?,
2093        )?))
2094    }
2095
2096    fn generate_series_name_to_str(name: protobuf::GenerateSeriesName) -> &'static str {
2097        match name {
2098            protobuf::GenerateSeriesName::GsGenerateSeries => "generate_series",
2099            protobuf::GenerateSeriesName::GsRange => "range",
2100        }
2101    }
2102    fn try_into_sort_join(
2103        &self,
2104        sort_join: &SortMergeJoinExecNode,
2105        ctx: &PhysicalPlanDecodeContext<'_>,
2106        proto_converter: &dyn PhysicalProtoConverterExtension,
2107    ) -> Result<Arc<dyn ExecutionPlan>> {
2108        let left = into_physical_plan(&sort_join.left, ctx, proto_converter)?;
2109        let left_schema = left.schema();
2110        let right = into_physical_plan(&sort_join.right, ctx, proto_converter)?;
2111        let right_schema = right.schema();
2112
2113        let filter = sort_join
2114            .filter
2115            .as_ref()
2116            .map(|f| {
2117                let schema = f
2118                    .schema
2119                    .as_ref()
2120                    .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
2121                    .try_into()?;
2122
2123                let expression = proto_converter.proto_to_physical_expr(
2124                    f.expression.as_ref().ok_or_else(|| {
2125                        proto_error("Unexpected empty filter expression")
2126                    })?,
2127                    &schema,
2128                    ctx,
2129                )?;
2130                let column_indices = f
2131                    .column_indices
2132                    .iter()
2133                    .map(|i| {
2134                        let side =
2135                            protobuf::JoinSide::try_from(i.side).map_err(|_| {
2136                                proto_error(format!(
2137                                    "Received a SortMergeJoinExecNode message with JoinSide in Filter {}",
2138                                    i.side
2139                                ))
2140                            })?;
2141
2142                        Ok(ColumnIndex {
2143                            index: i.index as usize,
2144                            side: side.into(),
2145                        })
2146                    })
2147                    .collect::<Result<Vec<_>>>()?;
2148
2149                Ok(JoinFilter::new(
2150                    expression,
2151                    column_indices,
2152                    Arc::new(schema),
2153                ))
2154            })
2155            .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
2156
2157        let join_type =
2158            protobuf::JoinType::try_from(sort_join.join_type).map_err(|_| {
2159                proto_error(format!(
2160                    "Received a SortMergeJoinExecNode message with unknown JoinType {}",
2161                    sort_join.join_type
2162                ))
2163            })?;
2164
2165        let null_equality = protobuf::NullEquality::try_from(sort_join.null_equality)
2166            .map_err(|_| {
2167                proto_error(format!(
2168                    "Received a SortMergeJoinExecNode message with unknown NullEquality {}",
2169                    sort_join.null_equality
2170                ))
2171            })?;
2172
2173        let sort_options = sort_join
2174            .sort_options
2175            .iter()
2176            .map(|e| SortOptions {
2177                descending: !e.asc,
2178                nulls_first: e.nulls_first,
2179            })
2180            .collect();
2181        let on = sort_join
2182            .on
2183            .iter()
2184            .map(|col| {
2185                let left = proto_converter.proto_to_physical_expr(
2186                    &col.left.clone().unwrap(),
2187                    left_schema.as_ref(),
2188                    ctx,
2189                )?;
2190                let right = proto_converter.proto_to_physical_expr(
2191                    &col.right.clone().unwrap(),
2192                    right_schema.as_ref(),
2193                    ctx,
2194                )?;
2195                Ok((left, right))
2196            })
2197            .collect::<Result<_>>()?;
2198
2199        Ok(Arc::new(SortMergeJoinExec::try_new(
2200            left,
2201            right,
2202            on,
2203            filter,
2204            join_type.into(),
2205            sort_options,
2206            null_equality.into(),
2207        )?))
2208    }
2209
2210    fn try_into_generate_series_physical_plan(
2211        &self,
2212        generate_series: &protobuf::GenerateSeriesNode,
2213    ) -> Result<Arc<dyn ExecutionPlan>> {
2214        let schema: SchemaRef = Arc::new(convert_required!(generate_series.schema)?);
2215
2216        let args = match &generate_series.args {
2217            Some(protobuf::generate_series_node::Args::ContainsNull(args)) => {
2218                GenSeriesArgs::ContainsNull {
2219                    name: Self::generate_series_name_to_str(args.name()),
2220                }
2221            }
2222            Some(protobuf::generate_series_node::Args::Int64Args(args)) => {
2223                GenSeriesArgs::Int64Args {
2224                    start: args.start,
2225                    end: args.end,
2226                    step: args.step,
2227                    include_end: args.include_end,
2228                    name: Self::generate_series_name_to_str(args.name()),
2229                }
2230            }
2231            Some(protobuf::generate_series_node::Args::TimestampArgs(args)) => {
2232                let step_proto = args.step.as_ref().ok_or_else(|| {
2233                    internal_datafusion_err!("Missing step in TimestampArgs")
2234                })?;
2235                let step = IntervalMonthDayNanoType::make_value(
2236                    step_proto.months,
2237                    step_proto.days,
2238                    step_proto.nanos,
2239                );
2240                GenSeriesArgs::TimestampArgs {
2241                    start: args.start,
2242                    end: args.end,
2243                    step,
2244                    tz: args.tz.as_ref().map(|s| Arc::from(s.as_str())),
2245                    include_end: args.include_end,
2246                    name: Self::generate_series_name_to_str(args.name()),
2247                }
2248            }
2249            Some(protobuf::generate_series_node::Args::DateArgs(args)) => {
2250                let step_proto = args.step.as_ref().ok_or_else(|| {
2251                    internal_datafusion_err!("Missing step in DateArgs")
2252                })?;
2253                let step = IntervalMonthDayNanoType::make_value(
2254                    step_proto.months,
2255                    step_proto.days,
2256                    step_proto.nanos,
2257                );
2258                GenSeriesArgs::DateArgs {
2259                    start: args.start,
2260                    end: args.end,
2261                    step,
2262                    include_end: args.include_end,
2263                    name: Self::generate_series_name_to_str(args.name()),
2264                }
2265            }
2266            None => return internal_err!("Missing args in GenerateSeriesNode"),
2267        };
2268
2269        let table = GenerateSeriesTable::new(Arc::clone(&schema), args);
2270        let generator = table.as_generator(generate_series.target_batch_size as usize)?;
2271
2272        Ok(Arc::new(LazyMemoryExec::try_new(schema, vec![generator])?))
2273    }
2274
2275    fn try_into_cooperative_physical_plan(
2276        &self,
2277        field_stream: &protobuf::CooperativeExecNode,
2278        ctx: &PhysicalPlanDecodeContext<'_>,
2279        proto_converter: &dyn PhysicalProtoConverterExtension,
2280    ) -> Result<Arc<dyn ExecutionPlan>> {
2281        let input = into_physical_plan(&field_stream.input, ctx, proto_converter)?;
2282        Ok(Arc::new(CooperativeExec::new(input)))
2283    }
2284
2285    fn try_into_async_func_physical_plan(
2286        &self,
2287        async_func: &protobuf::AsyncFuncExecNode,
2288        ctx: &PhysicalPlanDecodeContext<'_>,
2289        proto_converter: &dyn PhysicalProtoConverterExtension,
2290    ) -> Result<Arc<dyn ExecutionPlan>> {
2291        let input: Arc<dyn ExecutionPlan> =
2292            into_physical_plan(&async_func.input, ctx, proto_converter)?;
2293
2294        if async_func.async_exprs.len() != async_func.async_expr_names.len() {
2295            return internal_err!(
2296                "AsyncFuncExecNode async_exprs length does not match async_expr_names"
2297            );
2298        }
2299
2300        let async_exprs = async_func
2301            .async_exprs
2302            .iter()
2303            .zip(async_func.async_expr_names.iter())
2304            .map(|(expr, name)| {
2305                let physical_expr = proto_converter.proto_to_physical_expr(
2306                    expr,
2307                    input.schema().as_ref(),
2308                    ctx,
2309                )?;
2310
2311                Ok(Arc::new(AsyncFuncExpr::try_new(
2312                    name.clone(),
2313                    physical_expr,
2314                    input.schema().as_ref(),
2315                )?))
2316            })
2317            .collect::<Result<Vec<_>>>()?;
2318
2319        Ok(Arc::new(AsyncFuncExec::try_new(async_exprs, input)?))
2320    }
2321
2322    fn try_into_buffer_physical_plan(
2323        &self,
2324        buffer: &protobuf::BufferExecNode,
2325        ctx: &PhysicalPlanDecodeContext<'_>,
2326        proto_converter: &dyn PhysicalProtoConverterExtension,
2327    ) -> Result<Arc<dyn ExecutionPlan>> {
2328        let input: Arc<dyn ExecutionPlan> =
2329            into_physical_plan(&buffer.input, ctx, proto_converter)?;
2330
2331        Ok(Arc::new(BufferExec::new(input, buffer.capacity as usize)))
2332    }
2333
2334    fn try_into_scalar_subquery_physical_plan(
2335        &self,
2336        sq: &protobuf::ScalarSubqueryExecNode,
2337        ctx: &PhysicalPlanDecodeContext<'_>,
2338        proto_converter: &dyn PhysicalProtoConverterExtension,
2339    ) -> Result<Arc<dyn ExecutionPlan>> {
2340        // First, deserialize the main input plan. We set up the subquery results
2341        // container first, so that ScalarSubqueryExpr nodes can reference it.
2342        let subquery_results = ScalarSubqueryResults::new(sq.subqueries.len());
2343        let input_ctx = ctx.with_scalar_subquery_results(subquery_results.clone());
2344        let input = into_physical_plan(&sq.input, &input_ctx, proto_converter)?;
2345
2346        // Now deserialize the subquery children.
2347        let subqueries: Vec<ScalarSubqueryLink> = sq
2348            .subqueries
2349            .iter()
2350            .enumerate()
2351            .map(|(index, sq_plan)| {
2352                let plan =
2353                    sq_plan.try_into_physical_plan_with_context(ctx, proto_converter)?;
2354                Ok(ScalarSubqueryLink {
2355                    plan,
2356                    index: SubqueryIndex::new(index),
2357                })
2358            })
2359            .collect::<Result<Vec<_>>>()?;
2360
2361        Ok(Arc::new(ScalarSubqueryExec::new(
2362            input,
2363            subqueries,
2364            subquery_results,
2365        )))
2366    }
2367
2368    fn try_from_explain_exec(
2369        exec: &ExplainExec,
2370        _codec: &dyn PhysicalExtensionCodec,
2371    ) -> Result<Self> {
2372        Ok(protobuf::PhysicalPlanNode {
2373            physical_plan_type: Some(PhysicalPlanType::Explain(
2374                protobuf::ExplainExecNode {
2375                    schema: Some(exec.schema().as_ref().try_into()?),
2376                    stringified_plans: exec
2377                        .stringified_plans()
2378                        .iter()
2379                        .map(|plan| plan.into())
2380                        .collect(),
2381                    verbose: exec.verbose(),
2382                },
2383            )),
2384        })
2385    }
2386
2387    fn try_from_projection_exec(
2388        exec: &ProjectionExec,
2389        codec: &dyn PhysicalExtensionCodec,
2390        proto_converter: &dyn PhysicalProtoConverterExtension,
2391    ) -> Result<Self> {
2392        let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2393            exec.input().to_owned(),
2394            codec,
2395            proto_converter,
2396        )?;
2397        let expr = exec
2398            .expr()
2399            .iter()
2400            .map(|proj_expr| {
2401                proto_converter.physical_expr_to_proto(&proj_expr.expr, codec)
2402            })
2403            .collect::<Result<Vec<_>>>()?;
2404        let expr_name = exec
2405            .expr()
2406            .iter()
2407            .map(|proj_expr| proj_expr.alias.clone())
2408            .collect();
2409        Ok(protobuf::PhysicalPlanNode {
2410            physical_plan_type: Some(PhysicalPlanType::Projection(Box::new(
2411                protobuf::ProjectionExecNode {
2412                    input: Some(Box::new(input)),
2413                    expr,
2414                    expr_name,
2415                },
2416            ))),
2417        })
2418    }
2419
2420    fn try_from_analyze_exec(
2421        exec: &AnalyzeExec,
2422        codec: &dyn PhysicalExtensionCodec,
2423        proto_converter: &dyn PhysicalProtoConverterExtension,
2424    ) -> Result<Self> {
2425        let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2426            exec.input().to_owned(),
2427            codec,
2428            proto_converter,
2429        )?;
2430        let (has_metric_categories, metric_categories) = match exec.metric_categories() {
2431            Some(cats) => (true, cats.iter().map(|c| c.to_string()).collect()),
2432            None => (false, vec![]),
2433        };
2434        Ok(protobuf::PhysicalPlanNode {
2435            physical_plan_type: Some(PhysicalPlanType::Analyze(Box::new(
2436                protobuf::AnalyzeExecNode {
2437                    verbose: exec.verbose(),
2438                    show_statistics: exec.show_statistics(),
2439                    input: Some(Box::new(input)),
2440                    schema: Some(exec.schema().as_ref().try_into()?),
2441                    has_metric_categories,
2442                    metric_categories,
2443                },
2444            ))),
2445        })
2446    }
2447
2448    fn try_from_filter_exec(
2449        exec: &FilterExec,
2450        codec: &dyn PhysicalExtensionCodec,
2451        proto_converter: &dyn PhysicalProtoConverterExtension,
2452    ) -> Result<Self> {
2453        let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2454            exec.input().to_owned(),
2455            codec,
2456            proto_converter,
2457        )?;
2458        Ok(protobuf::PhysicalPlanNode {
2459            physical_plan_type: Some(PhysicalPlanType::Filter(Box::new(
2460                protobuf::FilterExecNode {
2461                    input: Some(Box::new(input)),
2462                    expr: Some(
2463                        proto_converter
2464                            .physical_expr_to_proto(exec.predicate(), codec)?,
2465                    ),
2466                    default_filter_selectivity: exec.default_selectivity() as u32,
2467                    projection: match exec.projection() {
2468                        None => (0..exec.input().schema().fields().len())
2469                            .map(|i| i as u32)
2470                            .collect(),
2471                        Some(v) => v.iter().map(|x| *x as u32).collect(),
2472                    },
2473                    batch_size: exec.batch_size() as u32,
2474                    fetch: exec.fetch().map(|f| f as u32),
2475                },
2476            ))),
2477        })
2478    }
2479
2480    fn try_from_global_limit_exec(
2481        limit: &GlobalLimitExec,
2482        codec: &dyn PhysicalExtensionCodec,
2483        proto_converter: &dyn PhysicalProtoConverterExtension,
2484    ) -> Result<Self> {
2485        let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2486            limit.input().to_owned(),
2487            codec,
2488            proto_converter,
2489        )?;
2490
2491        Ok(protobuf::PhysicalPlanNode {
2492            physical_plan_type: Some(PhysicalPlanType::GlobalLimit(Box::new(
2493                protobuf::GlobalLimitExecNode {
2494                    input: Some(Box::new(input)),
2495                    skip: limit.skip() as u32,
2496                    fetch: match limit.fetch() {
2497                        Some(n) => n as i64,
2498                        _ => -1, // no limit
2499                    },
2500                },
2501            ))),
2502        })
2503    }
2504
2505    fn try_from_local_limit_exec(
2506        limit: &LocalLimitExec,
2507        codec: &dyn PhysicalExtensionCodec,
2508        proto_converter: &dyn PhysicalProtoConverterExtension,
2509    ) -> Result<Self> {
2510        let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2511            limit.input().to_owned(),
2512            codec,
2513            proto_converter,
2514        )?;
2515        Ok(protobuf::PhysicalPlanNode {
2516            physical_plan_type: Some(PhysicalPlanType::LocalLimit(Box::new(
2517                protobuf::LocalLimitExecNode {
2518                    input: Some(Box::new(input)),
2519                    fetch: limit.fetch() as u32,
2520                },
2521            ))),
2522        })
2523    }
2524
2525    fn try_from_hash_join_exec(
2526        exec: &HashJoinExec,
2527        codec: &dyn PhysicalExtensionCodec,
2528        proto_converter: &dyn PhysicalProtoConverterExtension,
2529    ) -> Result<Self> {
2530        let left = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2531            exec.left().to_owned(),
2532            codec,
2533            proto_converter,
2534        )?;
2535        let right = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2536            exec.right().to_owned(),
2537            codec,
2538            proto_converter,
2539        )?;
2540        let on: Vec<protobuf::JoinOn> = exec
2541            .on()
2542            .iter()
2543            .map(|tuple| {
2544                let l = proto_converter.physical_expr_to_proto(&tuple.0, codec)?;
2545                let r = proto_converter.physical_expr_to_proto(&tuple.1, codec)?;
2546                Ok::<_, DataFusionError>(protobuf::JoinOn {
2547                    left: Some(l),
2548                    right: Some(r),
2549                })
2550            })
2551            .collect::<Result<_>>()?;
2552        let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2553        let null_equality: protobuf::NullEquality = exec.null_equality().into();
2554        let filter = exec
2555            .filter()
2556            .as_ref()
2557            .map(|f| {
2558                let expression =
2559                    proto_converter.physical_expr_to_proto(f.expression(), codec)?;
2560                let column_indices = f
2561                    .column_indices()
2562                    .iter()
2563                    .map(|i| {
2564                        let side: protobuf::JoinSide = i.side.to_owned().into();
2565                        protobuf::ColumnIndex {
2566                            index: i.index as u32,
2567                            side: side.into(),
2568                        }
2569                    })
2570                    .collect();
2571                let schema = f.schema().as_ref().try_into()?;
2572                Ok(protobuf::JoinFilter {
2573                    expression: Some(expression),
2574                    column_indices,
2575                    schema: Some(schema),
2576                })
2577            })
2578            .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2579
2580        let partition_mode = match exec.partition_mode() {
2581            PartitionMode::CollectLeft => protobuf::PartitionMode::CollectLeft,
2582            PartitionMode::Partitioned => protobuf::PartitionMode::Partitioned,
2583            PartitionMode::Auto => protobuf::PartitionMode::Auto,
2584        };
2585
2586        let dynamic_filter = exec
2587            .dynamic_filter_expr()
2588            .map(|df| {
2589                let df_expr: Arc<dyn PhysicalExpr> =
2590                    Arc::clone(df) as Arc<dyn PhysicalExpr>;
2591                proto_converter.physical_expr_to_proto(&df_expr, codec)
2592            })
2593            .transpose()?;
2594
2595        Ok(protobuf::PhysicalPlanNode {
2596            physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new(
2597                protobuf::HashJoinExecNode {
2598                    left: Some(Box::new(left)),
2599                    right: Some(Box::new(right)),
2600                    on,
2601                    join_type: join_type.into(),
2602                    partition_mode: partition_mode.into(),
2603                    null_equality: null_equality.into(),
2604                    filter,
2605                    projection: exec.projection.as_ref().map_or_else(Vec::new, |v| {
2606                        v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2607                    }),
2608                    null_aware: exec.null_aware,
2609                    dynamic_filter,
2610                },
2611            ))),
2612        })
2613    }
2614
2615    fn try_from_symmetric_hash_join_exec(
2616        exec: &SymmetricHashJoinExec,
2617        codec: &dyn PhysicalExtensionCodec,
2618        proto_converter: &dyn PhysicalProtoConverterExtension,
2619    ) -> Result<Self> {
2620        let left = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2621            exec.left().to_owned(),
2622            codec,
2623            proto_converter,
2624        )?;
2625        let right = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2626            exec.right().to_owned(),
2627            codec,
2628            proto_converter,
2629        )?;
2630        let on = exec
2631            .on()
2632            .iter()
2633            .map(|tuple| {
2634                let l = proto_converter.physical_expr_to_proto(&tuple.0, codec)?;
2635                let r = proto_converter.physical_expr_to_proto(&tuple.1, codec)?;
2636                Ok::<_, DataFusionError>(protobuf::JoinOn {
2637                    left: Some(l),
2638                    right: Some(r),
2639                })
2640            })
2641            .collect::<Result<_>>()?;
2642        let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2643        let null_equality: protobuf::NullEquality = exec.null_equality().into();
2644        let filter = exec
2645            .filter()
2646            .as_ref()
2647            .map(|f| {
2648                let expression =
2649                    proto_converter.physical_expr_to_proto(f.expression(), codec)?;
2650                let column_indices = f
2651                    .column_indices()
2652                    .iter()
2653                    .map(|i| {
2654                        let side: protobuf::JoinSide = i.side.to_owned().into();
2655                        protobuf::ColumnIndex {
2656                            index: i.index as u32,
2657                            side: side.into(),
2658                        }
2659                    })
2660                    .collect();
2661                let schema = f.schema().as_ref().try_into()?;
2662                Ok(protobuf::JoinFilter {
2663                    expression: Some(expression),
2664                    column_indices,
2665                    schema: Some(schema),
2666                })
2667            })
2668            .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2669
2670        let partition_mode = match exec.partition_mode() {
2671            StreamJoinPartitionMode::SinglePartition => {
2672                protobuf::StreamPartitionMode::SinglePartition
2673            }
2674            StreamJoinPartitionMode::Partitioned => {
2675                protobuf::StreamPartitionMode::PartitionedExec
2676            }
2677        };
2678
2679        let left_sort_exprs = exec
2680            .left_sort_exprs()
2681            .map(|exprs| {
2682                exprs
2683                    .iter()
2684                    .map(|expr| {
2685                        Ok(protobuf::PhysicalSortExprNode {
2686                            expr: Some(Box::new(
2687                                proto_converter
2688                                    .physical_expr_to_proto(&expr.expr, codec)?,
2689                            )),
2690                            asc: !expr.options.descending,
2691                            nulls_first: expr.options.nulls_first,
2692                        })
2693                    })
2694                    .collect::<Result<Vec<_>>>()
2695            })
2696            .transpose()?
2697            .unwrap_or(vec![]);
2698
2699        let right_sort_exprs = exec
2700            .right_sort_exprs()
2701            .map(|exprs| {
2702                exprs
2703                    .iter()
2704                    .map(|expr| {
2705                        Ok(protobuf::PhysicalSortExprNode {
2706                            expr: Some(Box::new(
2707                                proto_converter
2708                                    .physical_expr_to_proto(&expr.expr, codec)?,
2709                            )),
2710                            asc: !expr.options.descending,
2711                            nulls_first: expr.options.nulls_first,
2712                        })
2713                    })
2714                    .collect::<Result<Vec<_>>>()
2715            })
2716            .transpose()?
2717            .unwrap_or(vec![]);
2718
2719        Ok(protobuf::PhysicalPlanNode {
2720            physical_plan_type: Some(PhysicalPlanType::SymmetricHashJoin(Box::new(
2721                protobuf::SymmetricHashJoinExecNode {
2722                    left: Some(Box::new(left)),
2723                    right: Some(Box::new(right)),
2724                    on,
2725                    join_type: join_type.into(),
2726                    partition_mode: partition_mode.into(),
2727                    null_equality: null_equality.into(),
2728                    left_sort_exprs,
2729                    right_sort_exprs,
2730                    filter,
2731                },
2732            ))),
2733        })
2734    }
2735
2736    fn try_from_sort_merge_join_exec(
2737        exec: &SortMergeJoinExec,
2738        codec: &dyn PhysicalExtensionCodec,
2739        proto_converter: &dyn PhysicalProtoConverterExtension,
2740    ) -> Result<Self> {
2741        let left = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2742            exec.left().to_owned(),
2743            codec,
2744            proto_converter,
2745        )?;
2746        let right = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2747            exec.right().to_owned(),
2748            codec,
2749            proto_converter,
2750        )?;
2751        let on = exec
2752            .on()
2753            .iter()
2754            .map(|tuple| {
2755                let l = proto_converter.physical_expr_to_proto(&tuple.0, codec)?;
2756                let r = proto_converter.physical_expr_to_proto(&tuple.1, codec)?;
2757                Ok::<_, DataFusionError>(protobuf::JoinOn {
2758                    left: Some(l),
2759                    right: Some(r),
2760                })
2761            })
2762            .collect::<Result<_>>()?;
2763        let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2764        let null_equality: protobuf::NullEquality = exec.null_equality().into();
2765        let filter = exec
2766            .filter()
2767            .as_ref()
2768            .map(|f| {
2769                let expression =
2770                    proto_converter.physical_expr_to_proto(f.expression(), codec)?;
2771                let column_indices = f
2772                    .column_indices()
2773                    .iter()
2774                    .map(|i| {
2775                        let side: protobuf::JoinSide = i.side.to_owned().into();
2776                        protobuf::ColumnIndex {
2777                            index: i.index as u32,
2778                            side: side.into(),
2779                        }
2780                    })
2781                    .collect();
2782                let schema = f.schema().as_ref().try_into()?;
2783                Ok(protobuf::JoinFilter {
2784                    expression: Some(expression),
2785                    column_indices,
2786                    schema: Some(schema),
2787                })
2788            })
2789            .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2790
2791        let sort_options = exec
2792            .sort_options()
2793            .iter()
2794            .map(
2795                |SortOptions {
2796                     descending,
2797                     nulls_first,
2798                 }| {
2799                    SortExprNode {
2800                        expr: None,
2801                        asc: !*descending,
2802                        nulls_first: *nulls_first,
2803                    }
2804                },
2805            )
2806            .collect();
2807
2808        Ok(protobuf::PhysicalPlanNode {
2809            physical_plan_type: Some(PhysicalPlanType::SortMergeJoin(Box::new(
2810                SortMergeJoinExecNode {
2811                    left: Some(Box::new(left)),
2812                    right: Some(Box::new(right)),
2813                    on,
2814                    join_type: join_type.into(),
2815                    null_equality: null_equality.into(),
2816                    filter,
2817                    sort_options,
2818                },
2819            ))),
2820        })
2821    }
2822
2823    fn try_from_cross_join_exec(
2824        exec: &CrossJoinExec,
2825        codec: &dyn PhysicalExtensionCodec,
2826        proto_converter: &dyn PhysicalProtoConverterExtension,
2827    ) -> Result<Self> {
2828        let left = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2829            exec.left().to_owned(),
2830            codec,
2831            proto_converter,
2832        )?;
2833        let right = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2834            exec.right().to_owned(),
2835            codec,
2836            proto_converter,
2837        )?;
2838        Ok(protobuf::PhysicalPlanNode {
2839            physical_plan_type: Some(PhysicalPlanType::CrossJoin(Box::new(
2840                protobuf::CrossJoinExecNode {
2841                    left: Some(Box::new(left)),
2842                    right: Some(Box::new(right)),
2843                },
2844            ))),
2845        })
2846    }
2847
2848    fn try_from_aggregate_exec(
2849        exec: &AggregateExec,
2850        codec: &dyn PhysicalExtensionCodec,
2851        proto_converter: &dyn PhysicalProtoConverterExtension,
2852    ) -> Result<Self> {
2853        let groups: Vec<bool> = exec
2854            .group_expr()
2855            .groups()
2856            .iter()
2857            .flatten()
2858            .copied()
2859            .collect();
2860
2861        let group_names = exec
2862            .group_expr()
2863            .expr()
2864            .iter()
2865            .map(|expr| expr.1.to_owned())
2866            .collect();
2867
2868        let filter = exec
2869            .filter_expr()
2870            .iter()
2871            .map(|expr| serialize_maybe_filter(expr.to_owned(), codec, proto_converter))
2872            .collect::<Result<Vec<_>>>()?;
2873
2874        let agg = exec
2875            .aggr_expr()
2876            .iter()
2877            .map(|expr| {
2878                serialize_physical_aggr_expr(expr.to_owned(), codec, proto_converter)
2879            })
2880            .collect::<Result<Vec<_>>>()?;
2881
2882        let agg_names = exec
2883            .aggr_expr()
2884            .iter()
2885            .map(|expr| expr.name().to_string())
2886            .collect::<Vec<_>>();
2887
2888        let agg_mode = match exec.mode() {
2889            AggregateMode::Partial => protobuf::AggregateMode::Partial,
2890            AggregateMode::Final => protobuf::AggregateMode::Final,
2891            AggregateMode::FinalPartitioned => protobuf::AggregateMode::FinalPartitioned,
2892            AggregateMode::Single => protobuf::AggregateMode::Single,
2893            AggregateMode::SinglePartitioned => {
2894                protobuf::AggregateMode::SinglePartitioned
2895            }
2896            AggregateMode::PartialReduce => protobuf::AggregateMode::PartialReduce,
2897        };
2898        let input_schema = exec.input_schema();
2899        let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2900            exec.input().to_owned(),
2901            codec,
2902            proto_converter,
2903        )?;
2904
2905        let null_expr = exec
2906            .group_expr()
2907            .null_expr()
2908            .iter()
2909            .map(|expr| proto_converter.physical_expr_to_proto(&expr.0, codec))
2910            .collect::<Result<Vec<_>>>()?;
2911
2912        let group_expr = exec
2913            .group_expr()
2914            .expr()
2915            .iter()
2916            .map(|expr| proto_converter.physical_expr_to_proto(&expr.0, codec))
2917            .collect::<Result<Vec<_>>>()?;
2918
2919        let limit = exec.limit_options().map(|config| protobuf::AggLimit {
2920            limit: config.limit() as u64,
2921            descending: config.descending(),
2922        });
2923
2924        Ok(protobuf::PhysicalPlanNode {
2925            physical_plan_type: Some(PhysicalPlanType::Aggregate(Box::new(
2926                protobuf::AggregateExecNode {
2927                    group_expr,
2928                    group_expr_name: group_names,
2929                    aggr_expr: agg,
2930                    filter_expr: filter,
2931                    aggr_expr_name: agg_names,
2932                    mode: agg_mode as i32,
2933                    input: Some(Box::new(input)),
2934                    input_schema: Some(input_schema.as_ref().try_into()?),
2935                    null_expr,
2936                    groups,
2937                    limit,
2938                    has_grouping_set: exec.group_expr().has_grouping_set(),
2939                    dynamic_filter: exec
2940                        .dynamic_filter_expr()
2941                        .map(|df| {
2942                            let df_expr: Arc<dyn PhysicalExpr> =
2943                                Arc::clone(df) as Arc<dyn PhysicalExpr>;
2944                            proto_converter.physical_expr_to_proto(&df_expr, codec)
2945                        })
2946                        .transpose()?,
2947                },
2948            ))),
2949        })
2950    }
2951
2952    fn try_from_empty_exec(
2953        empty: &EmptyExec,
2954        _codec: &dyn PhysicalExtensionCodec,
2955    ) -> Result<Self> {
2956        let schema = empty.schema().as_ref().try_into()?;
2957        Ok(protobuf::PhysicalPlanNode {
2958            physical_plan_type: Some(PhysicalPlanType::Empty(protobuf::EmptyExecNode {
2959                schema: Some(schema),
2960            })),
2961        })
2962    }
2963
2964    fn try_from_placeholder_row_exec(
2965        empty: &PlaceholderRowExec,
2966        _codec: &dyn PhysicalExtensionCodec,
2967    ) -> Result<Self> {
2968        let schema = empty.schema().as_ref().try_into()?;
2969        Ok(protobuf::PhysicalPlanNode {
2970            physical_plan_type: Some(PhysicalPlanType::PlaceholderRow(
2971                protobuf::PlaceholderRowExecNode {
2972                    schema: Some(schema),
2973                },
2974            )),
2975        })
2976    }
2977
2978    #[expect(deprecated)]
2979    fn try_from_coalesce_batches_exec(
2980        coalesce_batches: &CoalesceBatchesExec,
2981        codec: &dyn PhysicalExtensionCodec,
2982        proto_converter: &dyn PhysicalProtoConverterExtension,
2983    ) -> Result<Self> {
2984        let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2985            coalesce_batches.input().to_owned(),
2986            codec,
2987            proto_converter,
2988        )?;
2989        Ok(protobuf::PhysicalPlanNode {
2990            physical_plan_type: Some(PhysicalPlanType::CoalesceBatches(Box::new(
2991                protobuf::CoalesceBatchesExecNode {
2992                    input: Some(Box::new(input)),
2993                    target_batch_size: coalesce_batches.target_batch_size() as u32,
2994                    fetch: coalesce_batches.fetch().map(|n| n as u32),
2995                },
2996            ))),
2997        })
2998    }
2999
3000    fn try_from_data_source_exec(
3001        data_source_exec: &DataSourceExec,
3002        codec: &dyn PhysicalExtensionCodec,
3003        proto_converter: &dyn PhysicalProtoConverterExtension,
3004    ) -> Result<Option<Self>> {
3005        let data_source = data_source_exec.data_source();
3006        if let Some(maybe_csv) = data_source.downcast_ref::<FileScanConfig>() {
3007            let source = maybe_csv.file_source();
3008            if let Some(csv_config) = source.downcast_ref::<CsvSource>() {
3009                return Ok(Some(protobuf::PhysicalPlanNode {
3010                    physical_plan_type: Some(PhysicalPlanType::CsvScan(
3011                        protobuf::CsvScanExecNode {
3012                            base_conf: Some(serialize_file_scan_config(
3013                                maybe_csv,
3014                                codec,
3015                                proto_converter,
3016                            )?),
3017                            has_header: csv_config.has_header(),
3018                            delimiter: byte_to_string(
3019                                csv_config.delimiter(),
3020                                "delimiter",
3021                            )?,
3022                            quote: byte_to_string(csv_config.quote(), "quote")?,
3023                            optional_escape: if let Some(escape) = csv_config.escape() {
3024                                Some(
3025                                    protobuf::csv_scan_exec_node::OptionalEscape::Escape(
3026                                        byte_to_string(escape, "escape")?,
3027                                    ),
3028                                )
3029                            } else {
3030                                None
3031                            },
3032                            optional_comment: if let Some(comment) = csv_config.comment()
3033                            {
3034                                Some(protobuf::csv_scan_exec_node::OptionalComment::Comment(
3035                                        byte_to_string(comment, "comment")?,
3036                                    ))
3037                            } else {
3038                                None
3039                            },
3040                            newlines_in_values: csv_config.newlines_in_values(),
3041                            truncate_rows: csv_config.truncate_rows(),
3042                        },
3043                    )),
3044                }));
3045            }
3046        }
3047
3048        if let Some(scan_conf) = data_source.downcast_ref::<FileScanConfig>() {
3049            let source = scan_conf.file_source();
3050            if let Some(_json_source) = source.downcast_ref::<JsonSource>() {
3051                return Ok(Some(protobuf::PhysicalPlanNode {
3052                    physical_plan_type: Some(PhysicalPlanType::JsonScan(
3053                        protobuf::JsonScanExecNode {
3054                            base_conf: Some(serialize_file_scan_config(
3055                                scan_conf,
3056                                codec,
3057                                proto_converter,
3058                            )?),
3059                        },
3060                    )),
3061                }));
3062            }
3063        }
3064
3065        if let Some(scan_conf) = data_source.downcast_ref::<FileScanConfig>() {
3066            let source = scan_conf.file_source();
3067            if let Some(_arrow_source) = source.downcast_ref::<ArrowSource>() {
3068                return Ok(Some(protobuf::PhysicalPlanNode {
3069                    physical_plan_type: Some(PhysicalPlanType::ArrowScan(
3070                        protobuf::ArrowScanExecNode {
3071                            base_conf: Some(serialize_file_scan_config(
3072                                scan_conf,
3073                                codec,
3074                                proto_converter,
3075                            )?),
3076                        },
3077                    )),
3078                }));
3079            }
3080        }
3081
3082        #[cfg(feature = "parquet")]
3083        if let Some((maybe_parquet, conf)) =
3084            data_source_exec.downcast_to_file_source::<ParquetSource>()
3085        {
3086            let predicate = conf
3087                .filter()
3088                .map(|pred| proto_converter.physical_expr_to_proto(&pred, codec))
3089                .transpose()?;
3090            return Ok(Some(protobuf::PhysicalPlanNode {
3091                physical_plan_type: Some(PhysicalPlanType::ParquetScan(
3092                    protobuf::ParquetScanExecNode {
3093                        base_conf: Some(serialize_file_scan_config(
3094                            maybe_parquet,
3095                            codec,
3096                            proto_converter,
3097                        )?),
3098                        predicate,
3099                        parquet_options: Some(conf.table_parquet_options().try_into()?),
3100                    },
3101                )),
3102            }));
3103        }
3104
3105        #[cfg(feature = "avro")]
3106        if let Some(maybe_avro) = data_source.downcast_ref::<FileScanConfig>() {
3107            let source = maybe_avro.file_source();
3108            if source.downcast_ref::<AvroSource>().is_some() {
3109                return Ok(Some(protobuf::PhysicalPlanNode {
3110                    physical_plan_type: Some(PhysicalPlanType::AvroScan(
3111                        protobuf::AvroScanExecNode {
3112                            base_conf: Some(serialize_file_scan_config(
3113                                maybe_avro,
3114                                codec,
3115                                proto_converter,
3116                            )?),
3117                        },
3118                    )),
3119                }));
3120            }
3121        }
3122
3123        if let Some(source_conf) = data_source.downcast_ref::<MemorySourceConfig>() {
3124            let proto_partitions = source_conf
3125                .partitions()
3126                .iter()
3127                .map(|p| serialize_record_batches(p))
3128                .collect::<Result<Vec<_>>>()?;
3129
3130            let proto_schema: protobuf::Schema =
3131                source_conf.original_schema().as_ref().try_into()?;
3132
3133            let proto_projection = source_conf
3134                .projection()
3135                .as_ref()
3136                .map_or_else(Vec::new, |v| {
3137                    v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
3138                });
3139
3140            let proto_sort_information = source_conf
3141                .sort_information()
3142                .iter()
3143                .map(|ordering| {
3144                    let sort_exprs = serialize_physical_sort_exprs(
3145                        ordering.to_owned(),
3146                        codec,
3147                        proto_converter,
3148                    )?;
3149                    Ok::<_, DataFusionError>(protobuf::PhysicalSortExprNodeCollection {
3150                        physical_sort_expr_nodes: sort_exprs,
3151                    })
3152                })
3153                .collect::<Result<Vec<_>, _>>()?;
3154
3155            return Ok(Some(protobuf::PhysicalPlanNode {
3156                physical_plan_type: Some(PhysicalPlanType::MemoryScan(
3157                    protobuf::MemoryScanExecNode {
3158                        partitions: proto_partitions,
3159                        schema: Some(proto_schema),
3160                        projection: proto_projection,
3161                        sort_information: proto_sort_information,
3162                        show_sizes: source_conf.show_sizes(),
3163                        fetch: source_conf.fetch().map(|f| f as u32),
3164                    },
3165                )),
3166            }));
3167        }
3168
3169        Ok(None)
3170    }
3171
3172    fn try_from_coalesce_partitions_exec(
3173        exec: &CoalescePartitionsExec,
3174        codec: &dyn PhysicalExtensionCodec,
3175        proto_converter: &dyn PhysicalProtoConverterExtension,
3176    ) -> Result<Self> {
3177        let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3178            exec.input().to_owned(),
3179            codec,
3180            proto_converter,
3181        )?;
3182        Ok(protobuf::PhysicalPlanNode {
3183            physical_plan_type: Some(PhysicalPlanType::Merge(Box::new(
3184                protobuf::CoalescePartitionsExecNode {
3185                    input: Some(Box::new(input)),
3186                    fetch: exec.fetch().map(|f| f as u32),
3187                },
3188            ))),
3189        })
3190    }
3191
3192    fn try_from_repartition_exec(
3193        exec: &RepartitionExec,
3194        codec: &dyn PhysicalExtensionCodec,
3195        proto_converter: &dyn PhysicalProtoConverterExtension,
3196    ) -> Result<Self> {
3197        let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3198            exec.input().to_owned(),
3199            codec,
3200            proto_converter,
3201        )?;
3202
3203        let pb_partitioning =
3204            serialize_partitioning(exec.partitioning(), codec, proto_converter)?;
3205
3206        Ok(protobuf::PhysicalPlanNode {
3207            physical_plan_type: Some(PhysicalPlanType::Repartition(Box::new(
3208                protobuf::RepartitionExecNode {
3209                    input: Some(Box::new(input)),
3210                    partitioning: Some(pb_partitioning),
3211                    preserve_order: exec.preserve_order(),
3212                },
3213            ))),
3214        })
3215    }
3216
3217    fn try_from_sort_exec(
3218        exec: &SortExec,
3219        codec: &dyn PhysicalExtensionCodec,
3220        proto_converter: &dyn PhysicalProtoConverterExtension,
3221    ) -> Result<Self> {
3222        let input = proto_converter.execution_plan_to_proto(exec.input(), codec)?;
3223        let expr = exec
3224            .expr()
3225            .iter()
3226            .map(|expr| {
3227                let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
3228                    expr: Some(Box::new(
3229                        proto_converter.physical_expr_to_proto(&expr.expr, codec)?,
3230                    )),
3231                    asc: !expr.options.descending,
3232                    nulls_first: expr.options.nulls_first,
3233                });
3234                Ok(protobuf::PhysicalExprNode {
3235                    expr_id: None,
3236                    expr_type: Some(ExprType::Sort(sort_expr)),
3237                })
3238            })
3239            .collect::<Result<Vec<_>>>()?;
3240        let dynamic_filter = exec
3241            .dynamic_filter_expr()
3242            .map(|df| {
3243                let df_expr: Arc<dyn PhysicalExpr> = df as Arc<dyn PhysicalExpr>;
3244                proto_converter.physical_expr_to_proto(&df_expr, codec)
3245            })
3246            .transpose()?;
3247
3248        Ok(protobuf::PhysicalPlanNode {
3249            physical_plan_type: Some(PhysicalPlanType::Sort(Box::new(
3250                protobuf::SortExecNode {
3251                    input: Some(Box::new(input)),
3252                    expr,
3253                    fetch: match exec.fetch() {
3254                        Some(n) => n as i64,
3255                        _ => -1,
3256                    },
3257                    preserve_partitioning: exec.preserve_partitioning(),
3258                    dynamic_filter,
3259                },
3260            ))),
3261        })
3262    }
3263
3264    fn try_from_union_exec(
3265        union: &UnionExec,
3266        codec: &dyn PhysicalExtensionCodec,
3267        proto_converter: &dyn PhysicalProtoConverterExtension,
3268    ) -> Result<Self> {
3269        let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
3270        for input in union.inputs() {
3271            inputs.push(
3272                protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3273                    input.to_owned(),
3274                    codec,
3275                    proto_converter,
3276                )?,
3277            );
3278        }
3279        Ok(protobuf::PhysicalPlanNode {
3280            physical_plan_type: Some(PhysicalPlanType::Union(protobuf::UnionExecNode {
3281                inputs,
3282            })),
3283        })
3284    }
3285
3286    fn try_from_interleave_exec(
3287        interleave: &InterleaveExec,
3288        codec: &dyn PhysicalExtensionCodec,
3289        proto_converter: &dyn PhysicalProtoConverterExtension,
3290    ) -> Result<Self> {
3291        let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
3292        for input in interleave.inputs() {
3293            inputs.push(
3294                protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3295                    input.to_owned(),
3296                    codec,
3297                    proto_converter,
3298                )?,
3299            );
3300        }
3301        Ok(protobuf::PhysicalPlanNode {
3302            physical_plan_type: Some(PhysicalPlanType::Interleave(
3303                protobuf::InterleaveExecNode { inputs },
3304            )),
3305        })
3306    }
3307
3308    fn try_from_sort_preserving_merge_exec(
3309        exec: &SortPreservingMergeExec,
3310        codec: &dyn PhysicalExtensionCodec,
3311        proto_converter: &dyn PhysicalProtoConverterExtension,
3312    ) -> Result<Self> {
3313        let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3314            exec.input().to_owned(),
3315            codec,
3316            proto_converter,
3317        )?;
3318        let expr = exec
3319            .expr()
3320            .iter()
3321            .map(|expr| {
3322                let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
3323                    expr: Some(Box::new(
3324                        proto_converter.physical_expr_to_proto(&expr.expr, codec)?,
3325                    )),
3326                    asc: !expr.options.descending,
3327                    nulls_first: expr.options.nulls_first,
3328                });
3329                Ok(protobuf::PhysicalExprNode {
3330                    expr_id: None,
3331                    expr_type: Some(ExprType::Sort(sort_expr)),
3332                })
3333            })
3334            .collect::<Result<Vec<_>>>()?;
3335        Ok(protobuf::PhysicalPlanNode {
3336            physical_plan_type: Some(PhysicalPlanType::SortPreservingMerge(Box::new(
3337                protobuf::SortPreservingMergeExecNode {
3338                    input: Some(Box::new(input)),
3339                    expr,
3340                    fetch: exec.fetch().map(|f| f as i64).unwrap_or(-1),
3341                },
3342            ))),
3343        })
3344    }
3345
3346    fn try_from_nested_loop_join_exec(
3347        exec: &NestedLoopJoinExec,
3348        codec: &dyn PhysicalExtensionCodec,
3349        proto_converter: &dyn PhysicalProtoConverterExtension,
3350    ) -> Result<Self> {
3351        let left = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3352            exec.left().to_owned(),
3353            codec,
3354            proto_converter,
3355        )?;
3356        let right = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3357            exec.right().to_owned(),
3358            codec,
3359            proto_converter,
3360        )?;
3361
3362        let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
3363        let filter = exec
3364            .filter()
3365            .as_ref()
3366            .map(|f| {
3367                let expression =
3368                    proto_converter.physical_expr_to_proto(f.expression(), codec)?;
3369                let column_indices = f
3370                    .column_indices()
3371                    .iter()
3372                    .map(|i| {
3373                        let side: protobuf::JoinSide = i.side.to_owned().into();
3374                        protobuf::ColumnIndex {
3375                            index: i.index as u32,
3376                            side: side.into(),
3377                        }
3378                    })
3379                    .collect();
3380                let schema = f.schema().as_ref().try_into()?;
3381                Ok(protobuf::JoinFilter {
3382                    expression: Some(expression),
3383                    column_indices,
3384                    schema: Some(schema),
3385                })
3386            })
3387            .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
3388
3389        Ok(protobuf::PhysicalPlanNode {
3390            physical_plan_type: Some(PhysicalPlanType::NestedLoopJoin(Box::new(
3391                protobuf::NestedLoopJoinExecNode {
3392                    left: Some(Box::new(left)),
3393                    right: Some(Box::new(right)),
3394                    join_type: join_type.into(),
3395                    filter,
3396                    projection: exec.projection().as_ref().map_or_else(Vec::new, |v| {
3397                        v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
3398                    }),
3399                },
3400            ))),
3401        })
3402    }
3403
3404    fn try_from_window_agg_exec(
3405        exec: &WindowAggExec,
3406        codec: &dyn PhysicalExtensionCodec,
3407        proto_converter: &dyn PhysicalProtoConverterExtension,
3408    ) -> Result<Self> {
3409        let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3410            exec.input().to_owned(),
3411            codec,
3412            proto_converter,
3413        )?;
3414
3415        let window_expr = exec
3416            .window_expr()
3417            .iter()
3418            .map(|e| serialize_physical_window_expr(e, codec, proto_converter))
3419            .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
3420
3421        let partition_keys = exec
3422            .partition_keys()
3423            .iter()
3424            .map(|e| proto_converter.physical_expr_to_proto(e, codec))
3425            .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
3426
3427        Ok(protobuf::PhysicalPlanNode {
3428            physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
3429                protobuf::WindowAggExecNode {
3430                    input: Some(Box::new(input)),
3431                    window_expr,
3432                    partition_keys,
3433                    input_order_mode: None,
3434                },
3435            ))),
3436        })
3437    }
3438
3439    fn try_from_bounded_window_agg_exec(
3440        exec: &BoundedWindowAggExec,
3441        codec: &dyn PhysicalExtensionCodec,
3442        proto_converter: &dyn PhysicalProtoConverterExtension,
3443    ) -> Result<Self> {
3444        let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3445            exec.input().to_owned(),
3446            codec,
3447            proto_converter,
3448        )?;
3449
3450        let window_expr = exec
3451            .window_expr()
3452            .iter()
3453            .map(|e| serialize_physical_window_expr(e, codec, proto_converter))
3454            .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
3455
3456        let partition_keys = exec
3457            .partition_keys()
3458            .iter()
3459            .map(|e| proto_converter.physical_expr_to_proto(e, codec))
3460            .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
3461
3462        let input_order_mode = match &exec.input_order_mode {
3463            InputOrderMode::Linear => {
3464                window_agg_exec_node::InputOrderMode::Linear(protobuf::EmptyMessage {})
3465            }
3466            InputOrderMode::PartiallySorted(columns) => {
3467                window_agg_exec_node::InputOrderMode::PartiallySorted(
3468                    protobuf::PartiallySortedInputOrderMode {
3469                        columns: columns.iter().map(|c| *c as u64).collect(),
3470                    },
3471                )
3472            }
3473            InputOrderMode::Sorted => {
3474                window_agg_exec_node::InputOrderMode::Sorted(protobuf::EmptyMessage {})
3475            }
3476        };
3477
3478        Ok(protobuf::PhysicalPlanNode {
3479            physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
3480                protobuf::WindowAggExecNode {
3481                    input: Some(Box::new(input)),
3482                    window_expr,
3483                    partition_keys,
3484                    input_order_mode: Some(input_order_mode),
3485                },
3486            ))),
3487        })
3488    }
3489
3490    fn try_from_data_sink_exec(
3491        exec: &DataSinkExec,
3492        codec: &dyn PhysicalExtensionCodec,
3493        proto_converter: &dyn PhysicalProtoConverterExtension,
3494    ) -> Result<Option<Self>> {
3495        let input: protobuf::PhysicalPlanNode =
3496            protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3497                exec.input().to_owned(),
3498                codec,
3499                proto_converter,
3500            )?;
3501        let sort_order = match exec.sort_order() {
3502            Some(requirements) => {
3503                let expr = requirements
3504                    .iter()
3505                    .map(|requirement| {
3506                        let expr: PhysicalSortExpr = requirement.to_owned().into();
3507                        let sort_expr = protobuf::PhysicalSortExprNode {
3508                            expr: Some(Box::new(
3509                                proto_converter
3510                                    .physical_expr_to_proto(&expr.expr, codec)?,
3511                            )),
3512                            asc: !expr.options.descending,
3513                            nulls_first: expr.options.nulls_first,
3514                        };
3515                        Ok(sort_expr)
3516                    })
3517                    .collect::<Result<Vec<_>>>()?;
3518                Some(protobuf::PhysicalSortExprNodeCollection {
3519                    physical_sort_expr_nodes: expr,
3520                })
3521            }
3522            None => None,
3523        };
3524
3525        if let Some(sink) = exec.sink().downcast_ref::<JsonSink>() {
3526            return Ok(Some(protobuf::PhysicalPlanNode {
3527                physical_plan_type: Some(PhysicalPlanType::JsonSink(Box::new(
3528                    protobuf::JsonSinkExecNode {
3529                        input: Some(Box::new(input)),
3530                        sink: Some(sink.try_into()?),
3531                        sink_schema: Some(exec.schema().as_ref().try_into()?),
3532                        sort_order,
3533                    },
3534                ))),
3535            }));
3536        }
3537
3538        if let Some(sink) = exec.sink().downcast_ref::<CsvSink>() {
3539            return Ok(Some(protobuf::PhysicalPlanNode {
3540                physical_plan_type: Some(PhysicalPlanType::CsvSink(Box::new(
3541                    protobuf::CsvSinkExecNode {
3542                        input: Some(Box::new(input)),
3543                        sink: Some(sink.try_into()?),
3544                        sink_schema: Some(exec.schema().as_ref().try_into()?),
3545                        sort_order,
3546                    },
3547                ))),
3548            }));
3549        }
3550
3551        #[cfg(feature = "parquet")]
3552        if let Some(sink) = exec.sink().downcast_ref::<ParquetSink>() {
3553            return Ok(Some(protobuf::PhysicalPlanNode {
3554                physical_plan_type: Some(PhysicalPlanType::ParquetSink(Box::new(
3555                    protobuf::ParquetSinkExecNode {
3556                        input: Some(Box::new(input)),
3557                        sink: Some(sink.try_into()?),
3558                        sink_schema: Some(exec.schema().as_ref().try_into()?),
3559                        sort_order,
3560                    },
3561                ))),
3562            }));
3563        }
3564
3565        // If unknown DataSink then let extension handle it
3566        Ok(None)
3567    }
3568
3569    fn try_from_unnest_exec(
3570        exec: &UnnestExec,
3571        codec: &dyn PhysicalExtensionCodec,
3572        proto_converter: &dyn PhysicalProtoConverterExtension,
3573    ) -> Result<Self> {
3574        let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3575            exec.input().to_owned(),
3576            codec,
3577            proto_converter,
3578        )?;
3579
3580        Ok(protobuf::PhysicalPlanNode {
3581            physical_plan_type: Some(PhysicalPlanType::Unnest(Box::new(
3582                protobuf::UnnestExecNode {
3583                    input: Some(Box::new(input)),
3584                    schema: Some(exec.schema().try_into()?),
3585                    list_type_columns: exec
3586                        .list_column_indices()
3587                        .iter()
3588                        .map(|c| ProtoListUnnest {
3589                            index_in_input_schema: c.index_in_input_schema as _,
3590                            depth: c.depth as _,
3591                        })
3592                        .collect(),
3593                    struct_type_columns: exec
3594                        .struct_column_indices()
3595                        .iter()
3596                        .map(|c| *c as _)
3597                        .collect(),
3598                    options: Some(exec.options().into()),
3599                },
3600            ))),
3601        })
3602    }
3603
3604    fn try_from_cooperative_exec(
3605        exec: &CooperativeExec,
3606        codec: &dyn PhysicalExtensionCodec,
3607        proto_converter: &dyn PhysicalProtoConverterExtension,
3608    ) -> Result<Self> {
3609        let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3610            exec.input().to_owned(),
3611            codec,
3612            proto_converter,
3613        )?;
3614
3615        Ok(protobuf::PhysicalPlanNode {
3616            physical_plan_type: Some(PhysicalPlanType::Cooperative(Box::new(
3617                protobuf::CooperativeExecNode {
3618                    input: Some(Box::new(input)),
3619                },
3620            ))),
3621        })
3622    }
3623
3624    fn str_to_generate_series_name(name: &str) -> Result<protobuf::GenerateSeriesName> {
3625        match name {
3626            "generate_series" => Ok(protobuf::GenerateSeriesName::GsGenerateSeries),
3627            "range" => Ok(protobuf::GenerateSeriesName::GsRange),
3628            _ => internal_err!("unknown name: {name}"),
3629        }
3630    }
3631
3632    fn try_from_lazy_memory_exec(exec: &LazyMemoryExec) -> Result<Option<Self>> {
3633        let generators = exec.generators();
3634
3635        // ensure we only have one generator
3636        let [generator] = generators.as_slice() else {
3637            return Ok(None);
3638        };
3639
3640        let generator_guard = generator.read();
3641
3642        // Try to downcast to different generate_series types
3643        if let Some(empty_gen) = generator_guard.as_any().downcast_ref::<Empty>() {
3644            let schema = exec.schema();
3645            let node = protobuf::GenerateSeriesNode {
3646                schema: Some(schema.as_ref().try_into()?),
3647                target_batch_size: 8192, // Default batch size
3648                args: Some(protobuf::generate_series_node::Args::ContainsNull(
3649                    protobuf::GenerateSeriesArgsContainsNull {
3650                        name: Self::str_to_generate_series_name(empty_gen.name())? as i32,
3651                    },
3652                )),
3653            };
3654
3655            return Ok(Some(protobuf::PhysicalPlanNode {
3656                physical_plan_type: Some(PhysicalPlanType::GenerateSeries(node)),
3657            }));
3658        }
3659
3660        if let Some(int_64) = generator_guard
3661            .as_any()
3662            .downcast_ref::<GenericSeriesState<i64>>()
3663        {
3664            let schema = exec.schema();
3665            let node = protobuf::GenerateSeriesNode {
3666                schema: Some(schema.as_ref().try_into()?),
3667                target_batch_size: int_64.batch_size() as u32,
3668                args: Some(protobuf::generate_series_node::Args::Int64Args(
3669                    protobuf::GenerateSeriesArgsInt64 {
3670                        start: *int_64.start(),
3671                        end: *int_64.end(),
3672                        step: *int_64.step(),
3673                        include_end: int_64.include_end(),
3674                        name: Self::str_to_generate_series_name(int_64.name())? as i32,
3675                    },
3676                )),
3677            };
3678
3679            return Ok(Some(protobuf::PhysicalPlanNode {
3680                physical_plan_type: Some(PhysicalPlanType::GenerateSeries(node)),
3681            }));
3682        }
3683
3684        if let Some(timestamp_args) = generator_guard
3685            .as_any()
3686            .downcast_ref::<GenericSeriesState<TimestampValue>>()
3687        {
3688            let schema = exec.schema();
3689
3690            let start = timestamp_args.start().value();
3691            let end = timestamp_args.end().value();
3692
3693            let step_value = timestamp_args.step();
3694
3695            let step = Some(datafusion_proto_common::IntervalMonthDayNanoValue {
3696                months: step_value.months,
3697                days: step_value.days,
3698                nanos: step_value.nanoseconds,
3699            });
3700            let include_end = timestamp_args.include_end();
3701            let name = Self::str_to_generate_series_name(timestamp_args.name())? as i32;
3702
3703            let args = match timestamp_args.current().tz_str() {
3704                Some(tz) => protobuf::generate_series_node::Args::TimestampArgs(
3705                    protobuf::GenerateSeriesArgsTimestamp {
3706                        start,
3707                        end,
3708                        step,
3709                        include_end,
3710                        name,
3711                        tz: Some(tz.to_string()),
3712                    },
3713                ),
3714                None => protobuf::generate_series_node::Args::DateArgs(
3715                    protobuf::GenerateSeriesArgsDate {
3716                        start,
3717                        end,
3718                        step,
3719                        include_end,
3720                        name,
3721                    },
3722                ),
3723            };
3724
3725            let node = protobuf::GenerateSeriesNode {
3726                schema: Some(schema.as_ref().try_into()?),
3727                target_batch_size: timestamp_args.batch_size() as u32,
3728                args: Some(args),
3729            };
3730
3731            return Ok(Some(protobuf::PhysicalPlanNode {
3732                physical_plan_type: Some(PhysicalPlanType::GenerateSeries(node)),
3733            }));
3734        }
3735
3736        Ok(None)
3737    }
3738
3739    fn try_from_async_func_exec(
3740        exec: &AsyncFuncExec,
3741        codec: &dyn PhysicalExtensionCodec,
3742        proto_converter: &dyn PhysicalProtoConverterExtension,
3743    ) -> Result<Self> {
3744        let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3745            Arc::clone(exec.input()),
3746            codec,
3747            proto_converter,
3748        )?;
3749
3750        let mut async_exprs = vec![];
3751        let mut async_expr_names = vec![];
3752
3753        for async_expr in exec.async_exprs() {
3754            async_exprs
3755                .push(proto_converter.physical_expr_to_proto(&async_expr.func, codec)?);
3756            async_expr_names.push(async_expr.name.clone())
3757        }
3758
3759        Ok(protobuf::PhysicalPlanNode {
3760            physical_plan_type: Some(PhysicalPlanType::AsyncFunc(Box::new(
3761                protobuf::AsyncFuncExecNode {
3762                    input: Some(Box::new(input)),
3763                    async_exprs,
3764                    async_expr_names,
3765                },
3766            ))),
3767        })
3768    }
3769
3770    fn try_from_buffer_exec(
3771        exec: &BufferExec,
3772        extension_codec: &dyn PhysicalExtensionCodec,
3773        proto_converter: &dyn PhysicalProtoConverterExtension,
3774    ) -> Result<Self> {
3775        let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3776            Arc::clone(exec.input()),
3777            extension_codec,
3778            proto_converter,
3779        )?;
3780
3781        Ok(protobuf::PhysicalPlanNode {
3782            physical_plan_type: Some(PhysicalPlanType::Buffer(Box::new(
3783                protobuf::BufferExecNode {
3784                    input: Some(Box::new(input)),
3785                    capacity: exec.capacity() as u64,
3786                },
3787            ))),
3788        })
3789    }
3790
3791    fn try_from_scalar_subquery_exec(
3792        exec: &ScalarSubqueryExec,
3793        codec: &dyn PhysicalExtensionCodec,
3794        proto_converter: &dyn PhysicalProtoConverterExtension,
3795    ) -> Result<Self> {
3796        let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3797            Arc::clone(exec.input()),
3798            codec,
3799            proto_converter,
3800        )?;
3801        let subqueries = exec
3802            .subqueries()
3803            .iter()
3804            .map(|sq| {
3805                protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3806                    Arc::clone(&sq.plan),
3807                    codec,
3808                    proto_converter,
3809                )
3810            })
3811            .collect::<Result<Vec<_>>>()?;
3812
3813        Ok(protobuf::PhysicalPlanNode {
3814            physical_plan_type: Some(PhysicalPlanType::ScalarSubquery(Box::new(
3815                protobuf::ScalarSubqueryExecNode {
3816                    input: Some(Box::new(input)),
3817                    subqueries,
3818                },
3819            ))),
3820        })
3821    }
3822}
3823
3824pub trait AsExecutionPlan: Debug + Send + Sync + Clone {
3825    fn try_decode(buf: &[u8]) -> Result<Self>
3826    where
3827        Self: Sized;
3828
3829    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
3830    where
3831        B: BufMut,
3832        Self: Sized;
3833
3834    fn try_into_physical_plan(
3835        &self,
3836        ctx: &TaskContext,
3837
3838        codec: &dyn PhysicalExtensionCodec,
3839    ) -> Result<Arc<dyn ExecutionPlan>>;
3840
3841    fn try_from_physical_plan(
3842        plan: Arc<dyn ExecutionPlan>,
3843        codec: &dyn PhysicalExtensionCodec,
3844    ) -> Result<Self>
3845    where
3846        Self: Sized;
3847}
3848
3849pub trait PhysicalExtensionCodec: Debug + Send + Sync + Any {
3850    fn try_decode(
3851        &self,
3852        buf: &[u8],
3853        inputs: &[Arc<dyn ExecutionPlan>],
3854        ctx: &TaskContext,
3855    ) -> Result<Arc<dyn ExecutionPlan>>;
3856
3857    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()>;
3858
3859    fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
3860        not_impl_err!("PhysicalExtensionCodec is not provided for scalar function {name}")
3861    }
3862
3863    fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
3864        Ok(())
3865    }
3866
3867    fn try_decode_expr(
3868        &self,
3869        _buf: &[u8],
3870        _inputs: &[Arc<dyn PhysicalExpr>],
3871    ) -> Result<Arc<dyn PhysicalExpr>> {
3872        not_impl_err!("PhysicalExtensionCodec is not provided")
3873    }
3874
3875    fn try_encode_expr(
3876        &self,
3877        _node: &Arc<dyn PhysicalExpr>,
3878        _buf: &mut Vec<u8>,
3879    ) -> Result<()> {
3880        not_impl_err!("PhysicalExtensionCodec is not provided")
3881    }
3882
3883    fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
3884        not_impl_err!(
3885            "PhysicalExtensionCodec is not provided for aggregate function {name}"
3886        )
3887    }
3888
3889    fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
3890        Ok(())
3891    }
3892
3893    fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
3894        not_impl_err!("PhysicalExtensionCodec is not provided for window function {name}")
3895    }
3896
3897    fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
3898        Ok(())
3899    }
3900}
3901
3902#[derive(Debug)]
3903pub struct DefaultPhysicalExtensionCodec {}
3904
3905impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec {
3906    fn try_decode(
3907        &self,
3908        _buf: &[u8],
3909        _inputs: &[Arc<dyn ExecutionPlan>],
3910        _ctx: &TaskContext,
3911    ) -> Result<Arc<dyn ExecutionPlan>> {
3912        not_impl_err!("PhysicalExtensionCodec is not provided")
3913    }
3914
3915    fn try_encode(
3916        &self,
3917        _node: Arc<dyn ExecutionPlan>,
3918        _buf: &mut Vec<u8>,
3919    ) -> Result<()> {
3920        not_impl_err!("PhysicalExtensionCodec is not provided")
3921    }
3922}
3923
3924/// Controls the conversion of physical plans and expressions to and from their
3925/// Protobuf variants. Using this trait, users can perform optimizations on the
3926/// conversion process or collect performance metrics.
3927pub trait PhysicalProtoConverterExtension {
3928    fn proto_to_execution_plan(
3929        &self,
3930        proto: &protobuf::PhysicalPlanNode,
3931        ctx: &PhysicalPlanDecodeContext<'_>,
3932    ) -> Result<Arc<dyn ExecutionPlan>>;
3933
3934    fn default_proto_to_execution_plan(
3935        &self,
3936        proto: &protobuf::PhysicalPlanNode,
3937        ctx: &PhysicalPlanDecodeContext<'_>,
3938    ) -> Result<Arc<dyn ExecutionPlan>>
3939    where
3940        Self: Sized,
3941    {
3942        proto.try_into_physical_plan_with_context(ctx, self)
3943    }
3944
3945    fn execution_plan_to_proto(
3946        &self,
3947        plan: &Arc<dyn ExecutionPlan>,
3948        codec: &dyn PhysicalExtensionCodec,
3949    ) -> Result<protobuf::PhysicalPlanNode>;
3950
3951    fn proto_to_physical_expr(
3952        &self,
3953        proto: &protobuf::PhysicalExprNode,
3954        input_schema: &Schema,
3955        ctx: &PhysicalPlanDecodeContext<'_>,
3956    ) -> Result<Arc<dyn PhysicalExpr>>;
3957
3958    fn default_proto_to_physical_expr(
3959        &self,
3960        proto: &protobuf::PhysicalExprNode,
3961        input_schema: &Schema,
3962        ctx: &PhysicalPlanDecodeContext<'_>,
3963    ) -> Result<Arc<dyn PhysicalExpr>>
3964    where
3965        Self: Sized,
3966    {
3967        parse_physical_expr_with_converter(proto, input_schema, ctx, self)
3968    }
3969
3970    fn physical_expr_to_proto(
3971        &self,
3972        expr: &Arc<dyn PhysicalExpr>,
3973        codec: &dyn PhysicalExtensionCodec,
3974    ) -> Result<protobuf::PhysicalExprNode>;
3975}
3976
3977/// DataEncoderTuple captures the position of the encoder
3978/// in the codec list that was used to encode the data and actual encoded data
3979#[derive(Clone, PartialEq, prost::Message)]
3980struct DataEncoderTuple {
3981    /// The position of encoder used to encode data
3982    /// (to be used for decoding)
3983    #[prost(uint32, tag = 1)]
3984    pub encoder_position: u32,
3985
3986    #[prost(bytes, tag = 2)]
3987    pub blob: Vec<u8>,
3988}
3989
3990pub struct DefaultPhysicalProtoConverter {}
3991
3992impl PhysicalProtoConverterExtension for DefaultPhysicalProtoConverter {
3993    fn proto_to_execution_plan(
3994        &self,
3995        proto: &protobuf::PhysicalPlanNode,
3996        ctx: &PhysicalPlanDecodeContext<'_>,
3997    ) -> Result<Arc<dyn ExecutionPlan>> {
3998        proto.try_into_physical_plan_with_context(ctx, self)
3999    }
4000
4001    fn execution_plan_to_proto(
4002        &self,
4003        plan: &Arc<dyn ExecutionPlan>,
4004        codec: &dyn PhysicalExtensionCodec,
4005    ) -> Result<protobuf::PhysicalPlanNode>
4006    where
4007        Self: Sized,
4008    {
4009        protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
4010            Arc::clone(plan),
4011            codec,
4012            self,
4013        )
4014    }
4015
4016    fn proto_to_physical_expr(
4017        &self,
4018        proto: &protobuf::PhysicalExprNode,
4019        input_schema: &Schema,
4020        ctx: &PhysicalPlanDecodeContext<'_>,
4021    ) -> Result<Arc<dyn PhysicalExpr>>
4022    where
4023        Self: Sized,
4024    {
4025        // Default implementation calls the free function
4026        parse_physical_expr_with_converter(proto, input_schema, ctx, self)
4027    }
4028
4029    fn physical_expr_to_proto(
4030        &self,
4031        expr: &Arc<dyn PhysicalExpr>,
4032        codec: &dyn PhysicalExtensionCodec,
4033    ) -> Result<protobuf::PhysicalExprNode> {
4034        serialize_physical_expr_with_converter(expr, codec, self)
4035    }
4036}
4037
4038/// Internal deserializer that caches expressions by their `expression_id()` so
4039/// multiple occurrences of the same expression are deduped.
4040#[derive(Default)]
4041struct DeduplicatingDeserializer {
4042    /// Cache mapping expression_id to deserialized expressions.
4043    cache: RefCell<HashMap<u64, Arc<dyn PhysicalExpr>>>,
4044}
4045
4046impl PhysicalProtoConverterExtension for DeduplicatingDeserializer {
4047    fn proto_to_execution_plan(
4048        &self,
4049        proto: &protobuf::PhysicalPlanNode,
4050        ctx: &PhysicalPlanDecodeContext<'_>,
4051    ) -> Result<Arc<dyn ExecutionPlan>> {
4052        proto.try_into_physical_plan_with_context(ctx, self)
4053    }
4054
4055    fn execution_plan_to_proto(
4056        &self,
4057        _plan: &Arc<dyn ExecutionPlan>,
4058        _codec: &dyn PhysicalExtensionCodec,
4059    ) -> Result<protobuf::PhysicalPlanNode>
4060    where
4061        Self: Sized,
4062    {
4063        internal_err!("DeduplicatingDeserializer cannot serialize execution plans")
4064    }
4065
4066    fn proto_to_physical_expr(
4067        &self,
4068        proto: &protobuf::PhysicalExprNode,
4069        input_schema: &Schema,
4070        ctx: &PhysicalPlanDecodeContext<'_>,
4071    ) -> Result<Arc<dyn PhysicalExpr>>
4072    where
4073        Self: Sized,
4074    {
4075        // `expr_id` is the generic identity slot on `PhysicalExprNode`.
4076        // The default serializer populates it from `PhysicalExpr::expression_id`.
4077        // A missing id means this expression type doesn't participate in deduping.
4078        let Some(id) = proto.expr_id else {
4079            return parse_physical_expr_with_converter(proto, input_schema, ctx, self);
4080        };
4081
4082        let parsed = parse_physical_expr_with_converter(proto, input_schema, ctx, self)?;
4083
4084        let mut cache = self.cache.borrow_mut();
4085        if let Some(cached) = cache.get(&id) {
4086            // Since expressions may manage their own internal state when deriving
4087            // expressions via `with_new_children`, we use `with_new_children`
4088            // to opt into the same behavior.
4089            //
4090            // For example, one `DynamicFilterPhysicalExpr` may be derived from
4091            // another resulting in shared references. Using `with_new_children`
4092            // is meant to preserve those references.
4093            let children: Vec<_> = parsed.children().into_iter().cloned().collect();
4094            return Arc::clone(cached).with_new_children(children);
4095        }
4096
4097        cache.insert(id, Arc::clone(&parsed));
4098        Ok(parsed)
4099    }
4100
4101    fn physical_expr_to_proto(
4102        &self,
4103        _expr: &Arc<dyn PhysicalExpr>,
4104        _codec: &dyn PhysicalExtensionCodec,
4105    ) -> Result<protobuf::PhysicalExprNode> {
4106        internal_err!("DeduplicatingDeserializer cannot serialize physical expressions")
4107    }
4108}
4109
4110/// A proto converter that deduplicates [`PhysicalExpr`] by [`PhysicalExpr::expression_id`].
4111/// This helps preserve referential integrity when deserializing [`ExecutionPlan`]s
4112/// which may contain multiple occurrences of the same [`PhysicalExpr`] (ex. when
4113/// [`DynamicFilterPhysicalExpr`] are pushed down, it is important to preserve
4114/// referential integrity).
4115///
4116///
4117/// [`DynamicFilterPhysicalExpr`]: https://docs.rs/datafusion-physical-expr/latest/datafusion_physical_expr/expressions/struct.DynamicFilterPhysicalExpr.html
4118#[derive(Debug, Default, Clone, Copy)]
4119pub struct DeduplicatingProtoConverter {}
4120
4121impl PhysicalProtoConverterExtension for DeduplicatingProtoConverter {
4122    fn proto_to_execution_plan(
4123        &self,
4124        proto: &protobuf::PhysicalPlanNode,
4125        ctx: &PhysicalPlanDecodeContext<'_>,
4126    ) -> Result<Arc<dyn ExecutionPlan>> {
4127        let deserializer = DeduplicatingDeserializer::default();
4128        proto.try_into_physical_plan_with_context(ctx, &deserializer)
4129    }
4130
4131    fn execution_plan_to_proto(
4132        &self,
4133        plan: &Arc<dyn ExecutionPlan>,
4134        codec: &dyn PhysicalExtensionCodec,
4135    ) -> Result<protobuf::PhysicalPlanNode>
4136    where
4137        Self: Sized,
4138    {
4139        protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
4140            Arc::clone(plan),
4141            codec,
4142            self,
4143        )
4144    }
4145
4146    fn proto_to_physical_expr(
4147        &self,
4148        proto: &protobuf::PhysicalExprNode,
4149        input_schema: &Schema,
4150        ctx: &PhysicalPlanDecodeContext<'_>,
4151    ) -> Result<Arc<dyn PhysicalExpr>>
4152    where
4153        Self: Sized,
4154    {
4155        let deserializer = DeduplicatingDeserializer::default();
4156        deserializer.proto_to_physical_expr(proto, input_schema, ctx)
4157    }
4158
4159    fn physical_expr_to_proto(
4160        &self,
4161        expr: &Arc<dyn PhysicalExpr>,
4162        codec: &dyn PhysicalExtensionCodec,
4163    ) -> Result<protobuf::PhysicalExprNode> {
4164        serialize_physical_expr_with_converter(expr, codec, self)
4165    }
4166}
4167
4168/// A PhysicalExtensionCodec that tries one of multiple inner codecs
4169/// until one works
4170#[derive(Debug)]
4171pub struct ComposedPhysicalExtensionCodec {
4172    codecs: Vec<Arc<dyn PhysicalExtensionCodec>>,
4173}
4174
4175impl ComposedPhysicalExtensionCodec {
4176    // Position in this codecs list is important as it will be used for decoding.
4177    // If new codec is added it should go to last position.
4178    pub fn new(codecs: Vec<Arc<dyn PhysicalExtensionCodec>>) -> Self {
4179        Self { codecs }
4180    }
4181
4182    fn decode_protobuf<R>(
4183        &self,
4184        buf: &[u8],
4185        decode: impl FnOnce(&dyn PhysicalExtensionCodec, &[u8]) -> Result<R>,
4186    ) -> Result<R> {
4187        let proto =
4188            DataEncoderTuple::decode(buf).map_err(|e| internal_datafusion_err!("{e}"))?;
4189
4190        let codec = self.codecs.get(proto.encoder_position as usize).ok_or(
4191            internal_datafusion_err!("Can't find required codec in codec list"),
4192        )?;
4193
4194        decode(codec.as_ref(), &proto.blob)
4195    }
4196
4197    fn encode_protobuf(
4198        &self,
4199        buf: &mut Vec<u8>,
4200        mut encode: impl FnMut(&dyn PhysicalExtensionCodec, &mut Vec<u8>) -> Result<()>,
4201    ) -> Result<()> {
4202        let mut data = vec![];
4203        let mut last_err = None;
4204        let mut encoder_position = None;
4205
4206        // find the encoder
4207        for (position, codec) in self.codecs.iter().enumerate() {
4208            match encode(codec.as_ref(), &mut data) {
4209                Ok(_) => {
4210                    encoder_position = Some(position as u32);
4211                    break;
4212                }
4213                Err(err) => last_err = Some(err),
4214            }
4215        }
4216
4217        let encoder_position = encoder_position.ok_or_else(|| {
4218            last_err.unwrap_or_else(|| {
4219                DataFusionError::NotImplemented(
4220                    "Empty list of composed codecs".to_owned(),
4221                )
4222            })
4223        })?;
4224
4225        // encode with encoder position
4226        let proto = DataEncoderTuple {
4227            encoder_position,
4228            blob: data,
4229        };
4230        proto
4231            .encode(buf)
4232            .map_err(|e| internal_datafusion_err!("{e}"))
4233    }
4234}
4235
4236impl PhysicalExtensionCodec for ComposedPhysicalExtensionCodec {
4237    fn try_decode(
4238        &self,
4239        buf: &[u8],
4240        inputs: &[Arc<dyn ExecutionPlan>],
4241        ctx: &TaskContext,
4242    ) -> Result<Arc<dyn ExecutionPlan>> {
4243        self.decode_protobuf(buf, |codec, data| codec.try_decode(data, inputs, ctx))
4244    }
4245
4246    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
4247        self.encode_protobuf(buf, |codec, data| codec.try_encode(Arc::clone(&node), data))
4248    }
4249
4250    fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
4251        self.decode_protobuf(buf, |codec, data| codec.try_decode_udf(name, data))
4252    }
4253
4254    fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
4255        self.encode_protobuf(buf, |codec, data| codec.try_encode_udf(node, data))
4256    }
4257
4258    fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
4259        self.decode_protobuf(buf, |codec, data| codec.try_decode_udaf(name, data))
4260    }
4261
4262    fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
4263        self.encode_protobuf(buf, |codec, data| codec.try_encode_udaf(node, data))
4264    }
4265}
4266
4267fn into_physical_plan(
4268    node: &Option<Box<protobuf::PhysicalPlanNode>>,
4269    ctx: &PhysicalPlanDecodeContext<'_>,
4270    proto_converter: &dyn PhysicalProtoConverterExtension,
4271) -> Result<Arc<dyn ExecutionPlan>> {
4272    if let Some(field) = node {
4273        proto_converter.proto_to_execution_plan(field, ctx)
4274    } else {
4275        Err(proto_error("Missing required field in protobuf"))
4276    }
4277}