datafusion_proto/physical_plan/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use self::from_proto::parse_protobuf_partitioning;
22use self::to_proto::{serialize_partitioning, serialize_physical_expr};
23use crate::common::{byte_to_string, str_to_byte};
24use crate::physical_plan::from_proto::{
25    parse_physical_expr, parse_physical_sort_expr, parse_physical_sort_exprs,
26    parse_physical_window_expr, parse_protobuf_file_scan_config,
27    parse_protobuf_file_scan_schema, parse_record_batches,
28};
29use crate::physical_plan::to_proto::{
30    serialize_file_scan_config, serialize_maybe_filter, serialize_physical_aggr_expr,
31    serialize_physical_sort_exprs, serialize_physical_window_expr,
32    serialize_record_batches,
33};
34use crate::protobuf::physical_aggregate_expr_node::AggregateFunction;
35use crate::protobuf::physical_expr_node::ExprType;
36use crate::protobuf::physical_plan_node::PhysicalPlanType;
37use crate::protobuf::{
38    self, proto_error, window_agg_exec_node, ListUnnest as ProtoListUnnest, SortExprNode,
39    SortMergeJoinExecNode,
40};
41use crate::{convert_required, into_required};
42
43use datafusion::arrow::compute::SortOptions;
44use datafusion::arrow::datatypes::{IntervalMonthDayNanoType, Schema, SchemaRef};
45use datafusion::catalog::memory::MemorySourceConfig;
46use datafusion::datasource::file_format::csv::CsvSink;
47use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
48use datafusion::datasource::file_format::json::JsonSink;
49#[cfg(feature = "parquet")]
50use datafusion::datasource::file_format::parquet::ParquetSink;
51#[cfg(feature = "avro")]
52use datafusion::datasource::physical_plan::AvroSource;
53#[cfg(feature = "parquet")]
54use datafusion::datasource::physical_plan::ParquetSource;
55use datafusion::datasource::physical_plan::{
56    CsvSource, FileScanConfig, FileScanConfigBuilder, JsonSource,
57};
58use datafusion::datasource::sink::DataSinkExec;
59use datafusion::datasource::source::{DataSource, DataSourceExec};
60use datafusion::execution::runtime_env::RuntimeEnv;
61use datafusion::execution::FunctionRegistry;
62use datafusion::functions_table::generate_series::{
63    Empty, GenSeriesArgs, GenerateSeriesTable, GenericSeriesState, TimestampValue,
64};
65use datafusion::physical_expr::aggregate::AggregateExprBuilder;
66use datafusion::physical_expr::aggregate::AggregateFunctionExpr;
67use datafusion::physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef};
68use datafusion::physical_plan::aggregates::AggregateMode;
69use datafusion::physical_plan::aggregates::{AggregateExec, PhysicalGroupBy};
70use datafusion::physical_plan::analyze::AnalyzeExec;
71use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
72use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
73use datafusion::physical_plan::coop::CooperativeExec;
74use datafusion::physical_plan::empty::EmptyExec;
75use datafusion::physical_plan::explain::ExplainExec;
76use datafusion::physical_plan::expressions::PhysicalSortExpr;
77use datafusion::physical_plan::filter::FilterExec;
78use datafusion::physical_plan::joins::utils::{ColumnIndex, JoinFilter};
79use datafusion::physical_plan::joins::{
80    CrossJoinExec, NestedLoopJoinExec, SortMergeJoinExec, StreamJoinPartitionMode,
81    SymmetricHashJoinExec,
82};
83use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
84use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
85use datafusion::physical_plan::memory::LazyMemoryExec;
86use datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
87use datafusion::physical_plan::projection::{ProjectionExec, ProjectionExpr};
88use datafusion::physical_plan::repartition::RepartitionExec;
89use datafusion::physical_plan::sorts::sort::SortExec;
90use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
91use datafusion::physical_plan::union::{InterleaveExec, UnionExec};
92use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec};
93use datafusion::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
94use datafusion::physical_plan::{
95    ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr,
96};
97use datafusion_common::config::TableParquetOptions;
98use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
99use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
100
101use datafusion::prelude::SessionContext;
102use prost::bytes::BufMut;
103use prost::Message;
104
105pub mod from_proto;
106pub mod to_proto;
107
108impl AsExecutionPlan for protobuf::PhysicalPlanNode {
109    fn try_decode(buf: &[u8]) -> Result<Self>
110    where
111        Self: Sized,
112    {
113        protobuf::PhysicalPlanNode::decode(buf).map_err(|e| {
114            DataFusionError::Internal(format!("failed to decode physical plan: {e:?}"))
115        })
116    }
117
118    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
119    where
120        B: BufMut,
121        Self: Sized,
122    {
123        self.encode(buf).map_err(|e| {
124            DataFusionError::Internal(format!("failed to encode physical plan: {e:?}"))
125        })
126    }
127
128    fn try_into_physical_plan(
129        &self,
130        ctx: &SessionContext,
131        runtime: &RuntimeEnv,
132        extension_codec: &dyn PhysicalExtensionCodec,
133    ) -> Result<Arc<dyn ExecutionPlan>> {
134        let plan = self.physical_plan_type.as_ref().ok_or_else(|| {
135            proto_error(format!(
136                "physical_plan::from_proto() Unsupported physical plan '{self:?}'"
137            ))
138        })?;
139        match plan {
140            PhysicalPlanType::Explain(explain) => self.try_into_explain_physical_plan(
141                explain,
142                ctx,
143                runtime,
144                extension_codec,
145            ),
146            PhysicalPlanType::Projection(projection) => self
147                .try_into_projection_physical_plan(
148                    projection,
149                    ctx,
150                    runtime,
151                    extension_codec,
152                ),
153            PhysicalPlanType::Filter(filter) => {
154                self.try_into_filter_physical_plan(filter, ctx, runtime, extension_codec)
155            }
156            PhysicalPlanType::CsvScan(scan) => {
157                self.try_into_csv_scan_physical_plan(scan, ctx, runtime, extension_codec)
158            }
159            PhysicalPlanType::JsonScan(scan) => {
160                self.try_into_json_scan_physical_plan(scan, ctx, runtime, extension_codec)
161            }
162            #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
163            PhysicalPlanType::ParquetScan(scan) => self
164                .try_into_parquet_scan_physical_plan(scan, ctx, runtime, extension_codec),
165            #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
166            PhysicalPlanType::AvroScan(scan) => {
167                self.try_into_avro_scan_physical_plan(scan, ctx, runtime, extension_codec)
168            }
169            PhysicalPlanType::MemoryScan(scan) => self
170                .try_into_memory_scan_physical_plan(scan, ctx, runtime, extension_codec),
171            PhysicalPlanType::CoalesceBatches(coalesce_batches) => self
172                .try_into_coalesce_batches_physical_plan(
173                    coalesce_batches,
174                    ctx,
175                    runtime,
176                    extension_codec,
177                ),
178            PhysicalPlanType::Merge(merge) => {
179                self.try_into_merge_physical_plan(merge, ctx, runtime, extension_codec)
180            }
181            PhysicalPlanType::Repartition(repart) => self
182                .try_into_repartition_physical_plan(
183                    repart,
184                    ctx,
185                    runtime,
186                    extension_codec,
187                ),
188            PhysicalPlanType::GlobalLimit(limit) => self
189                .try_into_global_limit_physical_plan(
190                    limit,
191                    ctx,
192                    runtime,
193                    extension_codec,
194                ),
195            PhysicalPlanType::LocalLimit(limit) => self
196                .try_into_local_limit_physical_plan(limit, ctx, runtime, extension_codec),
197            PhysicalPlanType::Window(window_agg) => self.try_into_window_physical_plan(
198                window_agg,
199                ctx,
200                runtime,
201                extension_codec,
202            ),
203            PhysicalPlanType::Aggregate(hash_agg) => self
204                .try_into_aggregate_physical_plan(
205                    hash_agg,
206                    ctx,
207                    runtime,
208                    extension_codec,
209                ),
210            PhysicalPlanType::HashJoin(hashjoin) => self
211                .try_into_hash_join_physical_plan(
212                    hashjoin,
213                    ctx,
214                    runtime,
215                    extension_codec,
216                ),
217            PhysicalPlanType::SymmetricHashJoin(sym_join) => self
218                .try_into_symmetric_hash_join_physical_plan(
219                    sym_join,
220                    ctx,
221                    runtime,
222                    extension_codec,
223                ),
224            PhysicalPlanType::Union(union) => {
225                self.try_into_union_physical_plan(union, ctx, runtime, extension_codec)
226            }
227            PhysicalPlanType::Interleave(interleave) => self
228                .try_into_interleave_physical_plan(
229                    interleave,
230                    ctx,
231                    runtime,
232                    extension_codec,
233                ),
234            PhysicalPlanType::CrossJoin(crossjoin) => self
235                .try_into_cross_join_physical_plan(
236                    crossjoin,
237                    ctx,
238                    runtime,
239                    extension_codec,
240                ),
241            PhysicalPlanType::Empty(empty) => {
242                self.try_into_empty_physical_plan(empty, ctx, runtime, extension_codec)
243            }
244            PhysicalPlanType::PlaceholderRow(placeholder) => self
245                .try_into_placeholder_row_physical_plan(
246                    placeholder,
247                    ctx,
248                    runtime,
249                    extension_codec,
250                ),
251            PhysicalPlanType::Sort(sort) => {
252                self.try_into_sort_physical_plan(sort, ctx, runtime, extension_codec)
253            }
254            PhysicalPlanType::SortPreservingMerge(sort) => self
255                .try_into_sort_preserving_merge_physical_plan(
256                    sort,
257                    ctx,
258                    runtime,
259                    extension_codec,
260                ),
261            PhysicalPlanType::Extension(extension) => self
262                .try_into_extension_physical_plan(
263                    extension,
264                    ctx,
265                    runtime,
266                    extension_codec,
267                ),
268            PhysicalPlanType::NestedLoopJoin(join) => self
269                .try_into_nested_loop_join_physical_plan(
270                    join,
271                    ctx,
272                    runtime,
273                    extension_codec,
274                ),
275            PhysicalPlanType::Analyze(analyze) => self.try_into_analyze_physical_plan(
276                analyze,
277                ctx,
278                runtime,
279                extension_codec,
280            ),
281            PhysicalPlanType::JsonSink(sink) => {
282                self.try_into_json_sink_physical_plan(sink, ctx, runtime, extension_codec)
283            }
284            PhysicalPlanType::CsvSink(sink) => {
285                self.try_into_csv_sink_physical_plan(sink, ctx, runtime, extension_codec)
286            }
287            #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
288            PhysicalPlanType::ParquetSink(sink) => self
289                .try_into_parquet_sink_physical_plan(sink, ctx, runtime, extension_codec),
290            PhysicalPlanType::Unnest(unnest) => {
291                self.try_into_unnest_physical_plan(unnest, ctx, runtime, extension_codec)
292            }
293            PhysicalPlanType::Cooperative(cooperative) => self
294                .try_into_cooperative_physical_plan(
295                    cooperative,
296                    ctx,
297                    runtime,
298                    extension_codec,
299                ),
300            PhysicalPlanType::GenerateSeries(generate_series) => {
301                self.try_into_generate_series_physical_plan(generate_series)
302            }
303            PhysicalPlanType::SortMergeJoin(sort_join) => {
304                self.try_into_sort_join(sort_join, ctx, runtime, extension_codec)
305            }
306        }
307    }
308
309    fn try_from_physical_plan(
310        plan: Arc<dyn ExecutionPlan>,
311        extension_codec: &dyn PhysicalExtensionCodec,
312    ) -> Result<Self>
313    where
314        Self: Sized,
315    {
316        let plan_clone = Arc::clone(&plan);
317        let plan = plan.as_any();
318
319        if let Some(exec) = plan.downcast_ref::<ExplainExec>() {
320            return protobuf::PhysicalPlanNode::try_from_explain_exec(
321                exec,
322                extension_codec,
323            );
324        }
325
326        if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
327            return protobuf::PhysicalPlanNode::try_from_projection_exec(
328                exec,
329                extension_codec,
330            );
331        }
332
333        if let Some(exec) = plan.downcast_ref::<AnalyzeExec>() {
334            return protobuf::PhysicalPlanNode::try_from_analyze_exec(
335                exec,
336                extension_codec,
337            );
338        }
339
340        if let Some(exec) = plan.downcast_ref::<FilterExec>() {
341            return protobuf::PhysicalPlanNode::try_from_filter_exec(
342                exec,
343                extension_codec,
344            );
345        }
346
347        if let Some(limit) = plan.downcast_ref::<GlobalLimitExec>() {
348            return protobuf::PhysicalPlanNode::try_from_global_limit_exec(
349                limit,
350                extension_codec,
351            );
352        }
353
354        if let Some(limit) = plan.downcast_ref::<LocalLimitExec>() {
355            return protobuf::PhysicalPlanNode::try_from_local_limit_exec(
356                limit,
357                extension_codec,
358            );
359        }
360
361        if let Some(exec) = plan.downcast_ref::<HashJoinExec>() {
362            return protobuf::PhysicalPlanNode::try_from_hash_join_exec(
363                exec,
364                extension_codec,
365            );
366        }
367
368        if let Some(exec) = plan.downcast_ref::<SymmetricHashJoinExec>() {
369            return protobuf::PhysicalPlanNode::try_from_symmetric_hash_join_exec(
370                exec,
371                extension_codec,
372            );
373        }
374
375        if let Some(exec) = plan.downcast_ref::<SortMergeJoinExec>() {
376            return protobuf::PhysicalPlanNode::try_from_sort_merge_join_exec(
377                exec,
378                extension_codec,
379            );
380        }
381
382        if let Some(exec) = plan.downcast_ref::<CrossJoinExec>() {
383            return protobuf::PhysicalPlanNode::try_from_cross_join_exec(
384                exec,
385                extension_codec,
386            );
387        }
388
389        if let Some(exec) = plan.downcast_ref::<AggregateExec>() {
390            return protobuf::PhysicalPlanNode::try_from_aggregate_exec(
391                exec,
392                extension_codec,
393            );
394        }
395
396        if let Some(empty) = plan.downcast_ref::<EmptyExec>() {
397            return protobuf::PhysicalPlanNode::try_from_empty_exec(
398                empty,
399                extension_codec,
400            );
401        }
402
403        if let Some(empty) = plan.downcast_ref::<PlaceholderRowExec>() {
404            return protobuf::PhysicalPlanNode::try_from_placeholder_row_exec(
405                empty,
406                extension_codec,
407            );
408        }
409
410        if let Some(coalesce_batches) = plan.downcast_ref::<CoalesceBatchesExec>() {
411            return protobuf::PhysicalPlanNode::try_from_coalesce_batches_exec(
412                coalesce_batches,
413                extension_codec,
414            );
415        }
416
417        if let Some(data_source_exec) = plan.downcast_ref::<DataSourceExec>() {
418            if let Some(node) = protobuf::PhysicalPlanNode::try_from_data_source_exec(
419                data_source_exec,
420                extension_codec,
421            )? {
422                return Ok(node);
423            }
424        }
425
426        if let Some(exec) = plan.downcast_ref::<CoalescePartitionsExec>() {
427            return protobuf::PhysicalPlanNode::try_from_coalesce_partitions_exec(
428                exec,
429                extension_codec,
430            );
431        }
432
433        if let Some(exec) = plan.downcast_ref::<RepartitionExec>() {
434            return protobuf::PhysicalPlanNode::try_from_repartition_exec(
435                exec,
436                extension_codec,
437            );
438        }
439
440        if let Some(exec) = plan.downcast_ref::<SortExec>() {
441            return protobuf::PhysicalPlanNode::try_from_sort_exec(exec, extension_codec);
442        }
443
444        if let Some(union) = plan.downcast_ref::<UnionExec>() {
445            return protobuf::PhysicalPlanNode::try_from_union_exec(
446                union,
447                extension_codec,
448            );
449        }
450
451        if let Some(interleave) = plan.downcast_ref::<InterleaveExec>() {
452            return protobuf::PhysicalPlanNode::try_from_interleave_exec(
453                interleave,
454                extension_codec,
455            );
456        }
457
458        if let Some(exec) = plan.downcast_ref::<SortPreservingMergeExec>() {
459            return protobuf::PhysicalPlanNode::try_from_sort_preserving_merge_exec(
460                exec,
461                extension_codec,
462            );
463        }
464
465        if let Some(exec) = plan.downcast_ref::<NestedLoopJoinExec>() {
466            return protobuf::PhysicalPlanNode::try_from_nested_loop_join_exec(
467                exec,
468                extension_codec,
469            );
470        }
471
472        if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
473            return protobuf::PhysicalPlanNode::try_from_window_agg_exec(
474                exec,
475                extension_codec,
476            );
477        }
478
479        if let Some(exec) = plan.downcast_ref::<BoundedWindowAggExec>() {
480            return protobuf::PhysicalPlanNode::try_from_bounded_window_agg_exec(
481                exec,
482                extension_codec,
483            );
484        }
485
486        if let Some(exec) = plan.downcast_ref::<DataSinkExec>() {
487            if let Some(node) = protobuf::PhysicalPlanNode::try_from_data_sink_exec(
488                exec,
489                extension_codec,
490            )? {
491                return Ok(node);
492            }
493        }
494
495        if let Some(exec) = plan.downcast_ref::<UnnestExec>() {
496            return protobuf::PhysicalPlanNode::try_from_unnest_exec(
497                exec,
498                extension_codec,
499            );
500        }
501
502        if let Some(exec) = plan.downcast_ref::<CooperativeExec>() {
503            return protobuf::PhysicalPlanNode::try_from_cooperative_exec(
504                exec,
505                extension_codec,
506            );
507        }
508
509        if let Some(exec) = plan.downcast_ref::<LazyMemoryExec>() {
510            if let Some(node) =
511                protobuf::PhysicalPlanNode::try_from_lazy_memory_exec(exec)?
512            {
513                return Ok(node);
514            }
515        }
516
517        let mut buf: Vec<u8> = vec![];
518        match extension_codec.try_encode(Arc::clone(&plan_clone), &mut buf) {
519            Ok(_) => {
520                let inputs: Vec<protobuf::PhysicalPlanNode> = plan_clone
521                    .children()
522                    .into_iter()
523                    .cloned()
524                    .map(|i| {
525                        protobuf::PhysicalPlanNode::try_from_physical_plan(
526                            i,
527                            extension_codec,
528                        )
529                    })
530                    .collect::<Result<_>>()?;
531
532                Ok(protobuf::PhysicalPlanNode {
533                    physical_plan_type: Some(PhysicalPlanType::Extension(
534                        protobuf::PhysicalExtensionNode { node: buf, inputs },
535                    )),
536                })
537            }
538            Err(e) => internal_err!(
539                "Unsupported plan and extension codec failed with [{e}]. Plan: {plan_clone:?}"
540            ),
541        }
542    }
543}
544
545impl protobuf::PhysicalPlanNode {
546    fn try_into_explain_physical_plan(
547        &self,
548        explain: &protobuf::ExplainExecNode,
549        _ctx: &SessionContext,
550        _runtime: &RuntimeEnv,
551        _extension_codec: &dyn PhysicalExtensionCodec,
552    ) -> Result<Arc<dyn ExecutionPlan>> {
553        Ok(Arc::new(ExplainExec::new(
554            Arc::new(explain.schema.as_ref().unwrap().try_into()?),
555            explain
556                .stringified_plans
557                .iter()
558                .map(|plan| plan.into())
559                .collect(),
560            explain.verbose,
561        )))
562    }
563
564    fn try_into_projection_physical_plan(
565        &self,
566        projection: &protobuf::ProjectionExecNode,
567        ctx: &SessionContext,
568        runtime: &RuntimeEnv,
569        extension_codec: &dyn PhysicalExtensionCodec,
570    ) -> Result<Arc<dyn ExecutionPlan>> {
571        let input: Arc<dyn ExecutionPlan> =
572            into_physical_plan(&projection.input, ctx, runtime, extension_codec)?;
573        let exprs = projection
574            .expr
575            .iter()
576            .zip(projection.expr_name.iter())
577            .map(|(expr, name)| {
578                Ok((
579                    parse_physical_expr(
580                        expr,
581                        ctx,
582                        input.schema().as_ref(),
583                        extension_codec,
584                    )?,
585                    name.to_string(),
586                ))
587            })
588            .collect::<Result<Vec<(Arc<dyn PhysicalExpr>, String)>>>()?;
589        let proj_exprs: Vec<ProjectionExpr> = exprs
590            .into_iter()
591            .map(|(expr, alias)| ProjectionExpr { expr, alias })
592            .collect();
593        Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?))
594    }
595
596    fn try_into_filter_physical_plan(
597        &self,
598        filter: &protobuf::FilterExecNode,
599        ctx: &SessionContext,
600        runtime: &RuntimeEnv,
601        extension_codec: &dyn PhysicalExtensionCodec,
602    ) -> Result<Arc<dyn ExecutionPlan>> {
603        let input: Arc<dyn ExecutionPlan> =
604            into_physical_plan(&filter.input, ctx, runtime, extension_codec)?;
605
606        let predicate = filter
607            .expr
608            .as_ref()
609            .map(|expr| {
610                parse_physical_expr(expr, ctx, input.schema().as_ref(), extension_codec)
611            })
612            .transpose()?
613            .ok_or_else(|| {
614                DataFusionError::Internal(
615                    "filter (FilterExecNode) in PhysicalPlanNode is missing.".to_owned(),
616                )
617            })?;
618
619        let filter_selectivity = filter.default_filter_selectivity.try_into();
620        let projection = if !filter.projection.is_empty() {
621            Some(
622                filter
623                    .projection
624                    .iter()
625                    .map(|i| *i as usize)
626                    .collect::<Vec<_>>(),
627            )
628        } else {
629            None
630        };
631
632        let filter =
633            FilterExec::try_new(predicate, input)?.with_projection(projection)?;
634        match filter_selectivity {
635            Ok(filter_selectivity) => Ok(Arc::new(
636                filter.with_default_selectivity(filter_selectivity)?,
637            )),
638            Err(_) => Err(DataFusionError::Internal(
639                "filter_selectivity in PhysicalPlanNode is invalid ".to_owned(),
640            )),
641        }
642    }
643
644    fn try_into_csv_scan_physical_plan(
645        &self,
646        scan: &protobuf::CsvScanExecNode,
647        ctx: &SessionContext,
648        _runtime: &RuntimeEnv,
649        extension_codec: &dyn PhysicalExtensionCodec,
650    ) -> Result<Arc<dyn ExecutionPlan>> {
651        let escape =
652            if let Some(protobuf::csv_scan_exec_node::OptionalEscape::Escape(escape)) =
653                &scan.optional_escape
654            {
655                Some(str_to_byte(escape, "escape")?)
656            } else {
657                None
658            };
659
660        let comment = if let Some(
661            protobuf::csv_scan_exec_node::OptionalComment::Comment(comment),
662        ) = &scan.optional_comment
663        {
664            Some(str_to_byte(comment, "comment")?)
665        } else {
666            None
667        };
668
669        let source = Arc::new(
670            CsvSource::new(
671                scan.has_header,
672                str_to_byte(&scan.delimiter, "delimiter")?,
673                0,
674            )
675            .with_escape(escape)
676            .with_comment(comment),
677        );
678
679        let conf = FileScanConfigBuilder::from(parse_protobuf_file_scan_config(
680            scan.base_conf.as_ref().unwrap(),
681            ctx,
682            extension_codec,
683            source,
684        )?)
685        .with_newlines_in_values(scan.newlines_in_values)
686        .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
687        .build();
688        Ok(DataSourceExec::from_data_source(conf))
689    }
690
691    fn try_into_json_scan_physical_plan(
692        &self,
693        scan: &protobuf::JsonScanExecNode,
694        ctx: &SessionContext,
695        _runtime: &RuntimeEnv,
696        extension_codec: &dyn PhysicalExtensionCodec,
697    ) -> Result<Arc<dyn ExecutionPlan>> {
698        let scan_conf = parse_protobuf_file_scan_config(
699            scan.base_conf.as_ref().unwrap(),
700            ctx,
701            extension_codec,
702            Arc::new(JsonSource::new()),
703        )?;
704        Ok(DataSourceExec::from_data_source(scan_conf))
705    }
706
707    #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
708    fn try_into_parquet_scan_physical_plan(
709        &self,
710        scan: &protobuf::ParquetScanExecNode,
711        ctx: &SessionContext,
712        _runtime: &RuntimeEnv,
713        extension_codec: &dyn PhysicalExtensionCodec,
714    ) -> Result<Arc<dyn ExecutionPlan>> {
715        #[cfg(feature = "parquet")]
716        {
717            let schema =
718                parse_protobuf_file_scan_schema(scan.base_conf.as_ref().unwrap())?;
719
720            // Check if there's a projection and use projected schema for predicate parsing
721            let base_conf = scan.base_conf.as_ref().unwrap();
722            let predicate_schema = if !base_conf.projection.is_empty() {
723                // Create projected schema for parsing the predicate
724                let projected_fields: Vec<_> = base_conf
725                    .projection
726                    .iter()
727                    .map(|&i| schema.field(i as usize).clone())
728                    .collect();
729                Arc::new(Schema::new(projected_fields))
730            } else {
731                schema
732            };
733
734            let predicate = scan
735                .predicate
736                .as_ref()
737                .map(|expr| {
738                    parse_physical_expr(
739                        expr,
740                        ctx,
741                        predicate_schema.as_ref(),
742                        extension_codec,
743                    )
744                })
745                .transpose()?;
746            let mut options = TableParquetOptions::default();
747
748            if let Some(table_options) = scan.parquet_options.as_ref() {
749                options = table_options.try_into()?;
750            }
751            let mut source = ParquetSource::new(options);
752
753            if let Some(predicate) = predicate {
754                source = source.with_predicate(predicate);
755            }
756            let base_config = parse_protobuf_file_scan_config(
757                base_conf,
758                ctx,
759                extension_codec,
760                Arc::new(source),
761            )?;
762            Ok(DataSourceExec::from_data_source(base_config))
763        }
764        #[cfg(not(feature = "parquet"))]
765        panic!("Unable to process a Parquet PhysicalPlan when `parquet` feature is not enabled")
766    }
767
768    #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
769    fn try_into_avro_scan_physical_plan(
770        &self,
771        scan: &protobuf::AvroScanExecNode,
772        ctx: &SessionContext,
773        _runtime: &RuntimeEnv,
774        extension_codec: &dyn PhysicalExtensionCodec,
775    ) -> Result<Arc<dyn ExecutionPlan>> {
776        #[cfg(feature = "avro")]
777        {
778            let conf = parse_protobuf_file_scan_config(
779                scan.base_conf.as_ref().unwrap(),
780                ctx,
781                extension_codec,
782                Arc::new(AvroSource::new()),
783            )?;
784            Ok(DataSourceExec::from_data_source(conf))
785        }
786        #[cfg(not(feature = "avro"))]
787        panic!("Unable to process a Avro PhysicalPlan when `avro` feature is not enabled")
788    }
789
790    fn try_into_memory_scan_physical_plan(
791        &self,
792        scan: &protobuf::MemoryScanExecNode,
793        ctx: &SessionContext,
794        _runtime: &RuntimeEnv,
795        extension_codec: &dyn PhysicalExtensionCodec,
796    ) -> Result<Arc<dyn ExecutionPlan>> {
797        let partitions = scan
798            .partitions
799            .iter()
800            .map(|p| parse_record_batches(p))
801            .collect::<Result<Vec<_>>>()?;
802
803        let proto_schema = scan.schema.as_ref().ok_or_else(|| {
804            DataFusionError::Internal(
805                "schema in MemoryScanExecNode is missing.".to_owned(),
806            )
807        })?;
808        let schema: SchemaRef = SchemaRef::new(proto_schema.try_into()?);
809
810        let projection = if !scan.projection.is_empty() {
811            Some(
812                scan.projection
813                    .iter()
814                    .map(|i| *i as usize)
815                    .collect::<Vec<_>>(),
816            )
817        } else {
818            None
819        };
820
821        let mut sort_information = vec![];
822        for ordering in &scan.sort_information {
823            let sort_exprs = parse_physical_sort_exprs(
824                &ordering.physical_sort_expr_nodes,
825                ctx,
826                &schema,
827                extension_codec,
828            )?;
829            sort_information.extend(LexOrdering::new(sort_exprs));
830        }
831
832        let source = MemorySourceConfig::try_new(&partitions, schema, projection)?
833            .with_limit(scan.fetch.map(|f| f as usize))
834            .with_show_sizes(scan.show_sizes);
835
836        let source = source.try_with_sort_information(sort_information)?;
837
838        Ok(DataSourceExec::from_data_source(source))
839    }
840
841    fn try_into_coalesce_batches_physical_plan(
842        &self,
843        coalesce_batches: &protobuf::CoalesceBatchesExecNode,
844        ctx: &SessionContext,
845        runtime: &RuntimeEnv,
846        extension_codec: &dyn PhysicalExtensionCodec,
847    ) -> Result<Arc<dyn ExecutionPlan>> {
848        let input: Arc<dyn ExecutionPlan> =
849            into_physical_plan(&coalesce_batches.input, ctx, runtime, extension_codec)?;
850        Ok(Arc::new(
851            CoalesceBatchesExec::new(input, coalesce_batches.target_batch_size as usize)
852                .with_fetch(coalesce_batches.fetch.map(|f| f as usize)),
853        ))
854    }
855
856    fn try_into_merge_physical_plan(
857        &self,
858        merge: &protobuf::CoalescePartitionsExecNode,
859        ctx: &SessionContext,
860        runtime: &RuntimeEnv,
861        extension_codec: &dyn PhysicalExtensionCodec,
862    ) -> Result<Arc<dyn ExecutionPlan>> {
863        let input: Arc<dyn ExecutionPlan> =
864            into_physical_plan(&merge.input, ctx, runtime, extension_codec)?;
865        Ok(Arc::new(
866            CoalescePartitionsExec::new(input)
867                .with_fetch(merge.fetch.map(|f| f as usize)),
868        ))
869    }
870
871    fn try_into_repartition_physical_plan(
872        &self,
873        repart: &protobuf::RepartitionExecNode,
874        ctx: &SessionContext,
875        runtime: &RuntimeEnv,
876        extension_codec: &dyn PhysicalExtensionCodec,
877    ) -> Result<Arc<dyn ExecutionPlan>> {
878        let input: Arc<dyn ExecutionPlan> =
879            into_physical_plan(&repart.input, ctx, runtime, extension_codec)?;
880        let partitioning = parse_protobuf_partitioning(
881            repart.partitioning.as_ref(),
882            ctx,
883            input.schema().as_ref(),
884            extension_codec,
885        )?;
886        Ok(Arc::new(RepartitionExec::try_new(
887            input,
888            partitioning.unwrap(),
889        )?))
890    }
891
892    fn try_into_global_limit_physical_plan(
893        &self,
894        limit: &protobuf::GlobalLimitExecNode,
895        ctx: &SessionContext,
896        runtime: &RuntimeEnv,
897        extension_codec: &dyn PhysicalExtensionCodec,
898    ) -> Result<Arc<dyn ExecutionPlan>> {
899        let input: Arc<dyn ExecutionPlan> =
900            into_physical_plan(&limit.input, ctx, runtime, extension_codec)?;
901        let fetch = if limit.fetch >= 0 {
902            Some(limit.fetch as usize)
903        } else {
904            None
905        };
906        Ok(Arc::new(GlobalLimitExec::new(
907            input,
908            limit.skip as usize,
909            fetch,
910        )))
911    }
912
913    fn try_into_local_limit_physical_plan(
914        &self,
915        limit: &protobuf::LocalLimitExecNode,
916        ctx: &SessionContext,
917        runtime: &RuntimeEnv,
918        extension_codec: &dyn PhysicalExtensionCodec,
919    ) -> Result<Arc<dyn ExecutionPlan>> {
920        let input: Arc<dyn ExecutionPlan> =
921            into_physical_plan(&limit.input, ctx, runtime, extension_codec)?;
922        Ok(Arc::new(LocalLimitExec::new(input, limit.fetch as usize)))
923    }
924
925    fn try_into_window_physical_plan(
926        &self,
927        window_agg: &protobuf::WindowAggExecNode,
928        ctx: &SessionContext,
929        runtime: &RuntimeEnv,
930        extension_codec: &dyn PhysicalExtensionCodec,
931    ) -> Result<Arc<dyn ExecutionPlan>> {
932        let input: Arc<dyn ExecutionPlan> =
933            into_physical_plan(&window_agg.input, ctx, runtime, extension_codec)?;
934        let input_schema = input.schema();
935
936        let physical_window_expr: Vec<Arc<dyn WindowExpr>> = window_agg
937            .window_expr
938            .iter()
939            .map(|window_expr| {
940                parse_physical_window_expr(
941                    window_expr,
942                    ctx,
943                    input_schema.as_ref(),
944                    extension_codec,
945                )
946            })
947            .collect::<Result<Vec<_>, _>>()?;
948
949        let partition_keys = window_agg
950            .partition_keys
951            .iter()
952            .map(|expr| {
953                parse_physical_expr(expr, ctx, input.schema().as_ref(), extension_codec)
954            })
955            .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
956
957        if let Some(input_order_mode) = window_agg.input_order_mode.as_ref() {
958            let input_order_mode = match input_order_mode {
959                window_agg_exec_node::InputOrderMode::Linear(_) => InputOrderMode::Linear,
960                window_agg_exec_node::InputOrderMode::PartiallySorted(
961                    protobuf::PartiallySortedInputOrderMode { columns },
962                ) => InputOrderMode::PartiallySorted(
963                    columns.iter().map(|c| *c as usize).collect(),
964                ),
965                window_agg_exec_node::InputOrderMode::Sorted(_) => InputOrderMode::Sorted,
966            };
967
968            Ok(Arc::new(BoundedWindowAggExec::try_new(
969                physical_window_expr,
970                input,
971                input_order_mode,
972                !partition_keys.is_empty(),
973            )?))
974        } else {
975            Ok(Arc::new(WindowAggExec::try_new(
976                physical_window_expr,
977                input,
978                !partition_keys.is_empty(),
979            )?))
980        }
981    }
982
983    fn try_into_aggregate_physical_plan(
984        &self,
985        hash_agg: &protobuf::AggregateExecNode,
986        ctx: &SessionContext,
987        runtime: &RuntimeEnv,
988        extension_codec: &dyn PhysicalExtensionCodec,
989    ) -> Result<Arc<dyn ExecutionPlan>> {
990        let input: Arc<dyn ExecutionPlan> =
991            into_physical_plan(&hash_agg.input, ctx, runtime, extension_codec)?;
992        let mode = protobuf::AggregateMode::try_from(hash_agg.mode).map_err(|_| {
993            proto_error(format!(
994                "Received a AggregateNode message with unknown AggregateMode {}",
995                hash_agg.mode
996            ))
997        })?;
998        let agg_mode: AggregateMode = match mode {
999            protobuf::AggregateMode::Partial => AggregateMode::Partial,
1000            protobuf::AggregateMode::Final => AggregateMode::Final,
1001            protobuf::AggregateMode::FinalPartitioned => AggregateMode::FinalPartitioned,
1002            protobuf::AggregateMode::Single => AggregateMode::Single,
1003            protobuf::AggregateMode::SinglePartitioned => {
1004                AggregateMode::SinglePartitioned
1005            }
1006        };
1007
1008        let num_expr = hash_agg.group_expr.len();
1009
1010        let group_expr = hash_agg
1011            .group_expr
1012            .iter()
1013            .zip(hash_agg.group_expr_name.iter())
1014            .map(|(expr, name)| {
1015                parse_physical_expr(expr, ctx, input.schema().as_ref(), extension_codec)
1016                    .map(|expr| (expr, name.to_string()))
1017            })
1018            .collect::<Result<Vec<_>, _>>()?;
1019
1020        let null_expr = hash_agg
1021            .null_expr
1022            .iter()
1023            .zip(hash_agg.group_expr_name.iter())
1024            .map(|(expr, name)| {
1025                parse_physical_expr(expr, ctx, input.schema().as_ref(), extension_codec)
1026                    .map(|expr| (expr, name.to_string()))
1027            })
1028            .collect::<Result<Vec<_>, _>>()?;
1029
1030        let groups: Vec<Vec<bool>> = if !hash_agg.groups.is_empty() {
1031            hash_agg
1032                .groups
1033                .chunks(num_expr)
1034                .map(|g| g.to_vec())
1035                .collect::<Vec<Vec<bool>>>()
1036        } else {
1037            vec![]
1038        };
1039
1040        let input_schema = hash_agg.input_schema.as_ref().ok_or_else(|| {
1041            DataFusionError::Internal(
1042                "input_schema in AggregateNode is missing.".to_owned(),
1043            )
1044        })?;
1045        let physical_schema: SchemaRef = SchemaRef::new(input_schema.try_into()?);
1046
1047        let physical_filter_expr = hash_agg
1048            .filter_expr
1049            .iter()
1050            .map(|expr| {
1051                expr.expr
1052                    .as_ref()
1053                    .map(|e| {
1054                        parse_physical_expr(e, ctx, &physical_schema, extension_codec)
1055                    })
1056                    .transpose()
1057            })
1058            .collect::<Result<Vec<_>, _>>()?;
1059
1060        let physical_aggr_expr: Vec<Arc<AggregateFunctionExpr>> = hash_agg
1061            .aggr_expr
1062            .iter()
1063            .zip(hash_agg.aggr_expr_name.iter())
1064            .map(|(expr, name)| {
1065                let expr_type = expr.expr_type.as_ref().ok_or_else(|| {
1066                    proto_error("Unexpected empty aggregate physical expression")
1067                })?;
1068
1069                match expr_type {
1070                    ExprType::AggregateExpr(agg_node) => {
1071                        let input_phy_expr: Vec<Arc<dyn PhysicalExpr>> = agg_node
1072                            .expr
1073                            .iter()
1074                            .map(|e| {
1075                                parse_physical_expr(
1076                                    e,
1077                                    ctx,
1078                                    &physical_schema,
1079                                    extension_codec,
1080                                )
1081                            })
1082                            .collect::<Result<Vec<_>>>()?;
1083                        let order_bys = agg_node
1084                            .ordering_req
1085                            .iter()
1086                            .map(|e| {
1087                                parse_physical_sort_expr(
1088                                    e,
1089                                    ctx,
1090                                    &physical_schema,
1091                                    extension_codec,
1092                                )
1093                            })
1094                            .collect::<Result<_>>()?;
1095                        agg_node
1096                            .aggregate_function
1097                            .as_ref()
1098                            .map(|func| match func {
1099                                AggregateFunction::UserDefinedAggrFunction(udaf_name) => {
1100                                    let agg_udf = match &agg_node.fun_definition {
1101                                        Some(buf) => extension_codec
1102                                            .try_decode_udaf(udaf_name, buf)?,
1103                                        None => ctx.udaf(udaf_name).or_else(|_| {
1104                                            extension_codec
1105                                                .try_decode_udaf(udaf_name, &[])
1106                                        })?,
1107                                    };
1108
1109                                    AggregateExprBuilder::new(agg_udf, input_phy_expr)
1110                                        .schema(Arc::clone(&physical_schema))
1111                                        .alias(name)
1112                                        .human_display(agg_node.human_display.clone())
1113                                        .with_ignore_nulls(agg_node.ignore_nulls)
1114                                        .with_distinct(agg_node.distinct)
1115                                        .order_by(order_bys)
1116                                        .build()
1117                                        .map(Arc::new)
1118                                }
1119                            })
1120                            .transpose()?
1121                            .ok_or_else(|| {
1122                                proto_error(
1123                                    "Invalid AggregateExpr, missing aggregate_function",
1124                                )
1125                            })
1126                    }
1127                    _ => internal_err!("Invalid aggregate expression for AggregateExec"),
1128                }
1129            })
1130            .collect::<Result<Vec<_>, _>>()?;
1131
1132        let limit = hash_agg
1133            .limit
1134            .as_ref()
1135            .map(|lit_value| lit_value.limit as usize);
1136
1137        let agg = AggregateExec::try_new(
1138            agg_mode,
1139            PhysicalGroupBy::new(group_expr, null_expr, groups),
1140            physical_aggr_expr,
1141            physical_filter_expr,
1142            input,
1143            physical_schema,
1144        )?;
1145
1146        let agg = agg.with_limit(limit);
1147
1148        Ok(Arc::new(agg))
1149    }
1150
1151    fn try_into_hash_join_physical_plan(
1152        &self,
1153        hashjoin: &protobuf::HashJoinExecNode,
1154        ctx: &SessionContext,
1155        runtime: &RuntimeEnv,
1156        extension_codec: &dyn PhysicalExtensionCodec,
1157    ) -> Result<Arc<dyn ExecutionPlan>> {
1158        let left: Arc<dyn ExecutionPlan> =
1159            into_physical_plan(&hashjoin.left, ctx, runtime, extension_codec)?;
1160        let right: Arc<dyn ExecutionPlan> =
1161            into_physical_plan(&hashjoin.right, ctx, runtime, extension_codec)?;
1162        let left_schema = left.schema();
1163        let right_schema = right.schema();
1164        let on: Vec<(PhysicalExprRef, PhysicalExprRef)> = hashjoin
1165            .on
1166            .iter()
1167            .map(|col| {
1168                let left = parse_physical_expr(
1169                    &col.left.clone().unwrap(),
1170                    ctx,
1171                    left_schema.as_ref(),
1172                    extension_codec,
1173                )?;
1174                let right = parse_physical_expr(
1175                    &col.right.clone().unwrap(),
1176                    ctx,
1177                    right_schema.as_ref(),
1178                    extension_codec,
1179                )?;
1180                Ok((left, right))
1181            })
1182            .collect::<Result<_>>()?;
1183        let join_type =
1184            protobuf::JoinType::try_from(hashjoin.join_type).map_err(|_| {
1185                proto_error(format!(
1186                    "Received a HashJoinNode message with unknown JoinType {}",
1187                    hashjoin.join_type
1188                ))
1189            })?;
1190        let null_equality = protobuf::NullEquality::try_from(hashjoin.null_equality)
1191            .map_err(|_| {
1192                proto_error(format!(
1193                    "Received a HashJoinNode message with unknown NullEquality {}",
1194                    hashjoin.null_equality
1195                ))
1196            })?;
1197        let filter = hashjoin
1198            .filter
1199            .as_ref()
1200            .map(|f| {
1201                let schema = f
1202                    .schema
1203                    .as_ref()
1204                    .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1205                    .try_into()?;
1206
1207                let expression = parse_physical_expr(
1208                    f.expression.as_ref().ok_or_else(|| {
1209                        proto_error("Unexpected empty filter expression")
1210                    })?,
1211                    ctx, &schema,
1212                    extension_codec,
1213                )?;
1214                let column_indices = f.column_indices
1215                    .iter()
1216                    .map(|i| {
1217                        let side = protobuf::JoinSide::try_from(i.side)
1218                            .map_err(|_| proto_error(format!(
1219                                "Received a HashJoinNode message with JoinSide in Filter {}",
1220                                i.side))
1221                            )?;
1222
1223                        Ok(ColumnIndex {
1224                            index: i.index as usize,
1225                            side: side.into(),
1226                        })
1227                    })
1228                    .collect::<Result<Vec<_>>>()?;
1229
1230                Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1231            })
1232            .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1233
1234        let partition_mode = protobuf::PartitionMode::try_from(hashjoin.partition_mode)
1235            .map_err(|_| {
1236            proto_error(format!(
1237                "Received a HashJoinNode message with unknown PartitionMode {}",
1238                hashjoin.partition_mode
1239            ))
1240        })?;
1241        let partition_mode = match partition_mode {
1242            protobuf::PartitionMode::CollectLeft => PartitionMode::CollectLeft,
1243            protobuf::PartitionMode::Partitioned => PartitionMode::Partitioned,
1244            protobuf::PartitionMode::Auto => PartitionMode::Auto,
1245        };
1246        let projection = if !hashjoin.projection.is_empty() {
1247            Some(
1248                hashjoin
1249                    .projection
1250                    .iter()
1251                    .map(|i| *i as usize)
1252                    .collect::<Vec<_>>(),
1253            )
1254        } else {
1255            None
1256        };
1257        Ok(Arc::new(HashJoinExec::try_new(
1258            left,
1259            right,
1260            on,
1261            filter,
1262            &join_type.into(),
1263            projection,
1264            partition_mode,
1265            null_equality.into(),
1266        )?))
1267    }
1268
1269    fn try_into_symmetric_hash_join_physical_plan(
1270        &self,
1271        sym_join: &protobuf::SymmetricHashJoinExecNode,
1272        ctx: &SessionContext,
1273        runtime: &RuntimeEnv,
1274        extension_codec: &dyn PhysicalExtensionCodec,
1275    ) -> Result<Arc<dyn ExecutionPlan>> {
1276        let left = into_physical_plan(&sym_join.left, ctx, runtime, extension_codec)?;
1277        let right = into_physical_plan(&sym_join.right, ctx, runtime, extension_codec)?;
1278        let left_schema = left.schema();
1279        let right_schema = right.schema();
1280        let on = sym_join
1281            .on
1282            .iter()
1283            .map(|col| {
1284                let left = parse_physical_expr(
1285                    &col.left.clone().unwrap(),
1286                    ctx,
1287                    left_schema.as_ref(),
1288                    extension_codec,
1289                )?;
1290                let right = parse_physical_expr(
1291                    &col.right.clone().unwrap(),
1292                    ctx,
1293                    right_schema.as_ref(),
1294                    extension_codec,
1295                )?;
1296                Ok((left, right))
1297            })
1298            .collect::<Result<_>>()?;
1299        let join_type =
1300            protobuf::JoinType::try_from(sym_join.join_type).map_err(|_| {
1301                proto_error(format!(
1302                    "Received a SymmetricHashJoin message with unknown JoinType {}",
1303                    sym_join.join_type
1304                ))
1305            })?;
1306        let null_equality = protobuf::NullEquality::try_from(sym_join.null_equality)
1307            .map_err(|_| {
1308                proto_error(format!(
1309                    "Received a SymmetricHashJoin message with unknown NullEquality {}",
1310                    sym_join.null_equality
1311                ))
1312            })?;
1313        let filter = sym_join
1314            .filter
1315            .as_ref()
1316            .map(|f| {
1317                let schema = f
1318                    .schema
1319                    .as_ref()
1320                    .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1321                    .try_into()?;
1322
1323                let expression = parse_physical_expr(
1324                    f.expression.as_ref().ok_or_else(|| {
1325                        proto_error("Unexpected empty filter expression")
1326                    })?,
1327                    ctx, &schema,
1328                    extension_codec,
1329                )?;
1330                let column_indices = f.column_indices
1331                    .iter()
1332                    .map(|i| {
1333                        let side = protobuf::JoinSide::try_from(i.side)
1334                            .map_err(|_| proto_error(format!(
1335                                "Received a HashJoinNode message with JoinSide in Filter {}",
1336                                i.side))
1337                            )?;
1338
1339                        Ok(ColumnIndex {
1340                            index: i.index as usize,
1341                            side: side.into(),
1342                        })
1343                    })
1344                    .collect::<Result<_>>()?;
1345
1346                Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1347            })
1348            .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1349
1350        let left_sort_exprs = parse_physical_sort_exprs(
1351            &sym_join.left_sort_exprs,
1352            ctx,
1353            &left_schema,
1354            extension_codec,
1355        )?;
1356        let left_sort_exprs = LexOrdering::new(left_sort_exprs);
1357
1358        let right_sort_exprs = parse_physical_sort_exprs(
1359            &sym_join.right_sort_exprs,
1360            ctx,
1361            &right_schema,
1362            extension_codec,
1363        )?;
1364        let right_sort_exprs = LexOrdering::new(right_sort_exprs);
1365
1366        let partition_mode = protobuf::StreamPartitionMode::try_from(
1367            sym_join.partition_mode,
1368        )
1369        .map_err(|_| {
1370            proto_error(format!(
1371                "Received a SymmetricHashJoin message with unknown PartitionMode {}",
1372                sym_join.partition_mode
1373            ))
1374        })?;
1375        let partition_mode = match partition_mode {
1376            protobuf::StreamPartitionMode::SinglePartition => {
1377                StreamJoinPartitionMode::SinglePartition
1378            }
1379            protobuf::StreamPartitionMode::PartitionedExec => {
1380                StreamJoinPartitionMode::Partitioned
1381            }
1382        };
1383        SymmetricHashJoinExec::try_new(
1384            left,
1385            right,
1386            on,
1387            filter,
1388            &join_type.into(),
1389            null_equality.into(),
1390            left_sort_exprs,
1391            right_sort_exprs,
1392            partition_mode,
1393        )
1394        .map(|e| Arc::new(e) as _)
1395    }
1396
1397    fn try_into_union_physical_plan(
1398        &self,
1399        union: &protobuf::UnionExecNode,
1400        ctx: &SessionContext,
1401        runtime: &RuntimeEnv,
1402        extension_codec: &dyn PhysicalExtensionCodec,
1403    ) -> Result<Arc<dyn ExecutionPlan>> {
1404        let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1405        for input in &union.inputs {
1406            inputs.push(input.try_into_physical_plan(ctx, runtime, extension_codec)?);
1407        }
1408        Ok(Arc::new(UnionExec::new(inputs)))
1409    }
1410
1411    fn try_into_interleave_physical_plan(
1412        &self,
1413        interleave: &protobuf::InterleaveExecNode,
1414        ctx: &SessionContext,
1415        runtime: &RuntimeEnv,
1416        extension_codec: &dyn PhysicalExtensionCodec,
1417    ) -> Result<Arc<dyn ExecutionPlan>> {
1418        let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1419        for input in &interleave.inputs {
1420            inputs.push(input.try_into_physical_plan(ctx, runtime, extension_codec)?);
1421        }
1422        Ok(Arc::new(InterleaveExec::try_new(inputs)?))
1423    }
1424
1425    fn try_into_cross_join_physical_plan(
1426        &self,
1427        crossjoin: &protobuf::CrossJoinExecNode,
1428        ctx: &SessionContext,
1429        runtime: &RuntimeEnv,
1430        extension_codec: &dyn PhysicalExtensionCodec,
1431    ) -> Result<Arc<dyn ExecutionPlan>> {
1432        let left: Arc<dyn ExecutionPlan> =
1433            into_physical_plan(&crossjoin.left, ctx, runtime, extension_codec)?;
1434        let right: Arc<dyn ExecutionPlan> =
1435            into_physical_plan(&crossjoin.right, ctx, runtime, extension_codec)?;
1436        Ok(Arc::new(CrossJoinExec::new(left, right)))
1437    }
1438
1439    fn try_into_empty_physical_plan(
1440        &self,
1441        empty: &protobuf::EmptyExecNode,
1442        _ctx: &SessionContext,
1443        _runtime: &RuntimeEnv,
1444        _extension_codec: &dyn PhysicalExtensionCodec,
1445    ) -> Result<Arc<dyn ExecutionPlan>> {
1446        let schema = Arc::new(convert_required!(empty.schema)?);
1447        Ok(Arc::new(EmptyExec::new(schema)))
1448    }
1449
1450    fn try_into_placeholder_row_physical_plan(
1451        &self,
1452        placeholder: &protobuf::PlaceholderRowExecNode,
1453        _ctx: &SessionContext,
1454        _runtime: &RuntimeEnv,
1455        _extension_codec: &dyn PhysicalExtensionCodec,
1456    ) -> Result<Arc<dyn ExecutionPlan>> {
1457        let schema = Arc::new(convert_required!(placeholder.schema)?);
1458        Ok(Arc::new(PlaceholderRowExec::new(schema)))
1459    }
1460
1461    fn try_into_sort_physical_plan(
1462        &self,
1463        sort: &protobuf::SortExecNode,
1464        ctx: &SessionContext,
1465        runtime: &RuntimeEnv,
1466        extension_codec: &dyn PhysicalExtensionCodec,
1467    ) -> Result<Arc<dyn ExecutionPlan>> {
1468        let input = into_physical_plan(&sort.input, ctx, runtime, extension_codec)?;
1469        let exprs = sort
1470            .expr
1471            .iter()
1472            .map(|expr| {
1473                let expr = expr.expr_type.as_ref().ok_or_else(|| {
1474                    proto_error(format!(
1475                        "physical_plan::from_proto() Unexpected expr {self:?}"
1476                    ))
1477                })?;
1478                if let ExprType::Sort(sort_expr) = expr {
1479                    let expr = sort_expr
1480                        .expr
1481                        .as_ref()
1482                        .ok_or_else(|| {
1483                            proto_error(format!(
1484                                "physical_plan::from_proto() Unexpected sort expr {self:?}"
1485                            ))
1486                        })?
1487                        .as_ref();
1488                    Ok(PhysicalSortExpr {
1489                        expr: parse_physical_expr(expr, ctx, input.schema().as_ref(), extension_codec)?,
1490                        options: SortOptions {
1491                            descending: !sort_expr.asc,
1492                            nulls_first: sort_expr.nulls_first,
1493                        },
1494                    })
1495                } else {
1496                    internal_err!(
1497                        "physical_plan::from_proto() {self:?}"
1498                    )
1499                }
1500            })
1501            .collect::<Result<Vec<_>>>()?;
1502        let Some(ordering) = LexOrdering::new(exprs) else {
1503            return internal_err!("SortExec requires an ordering");
1504        };
1505        let fetch = (sort.fetch >= 0).then_some(sort.fetch as _);
1506        let new_sort = SortExec::new(ordering, input)
1507            .with_fetch(fetch)
1508            .with_preserve_partitioning(sort.preserve_partitioning);
1509
1510        Ok(Arc::new(new_sort))
1511    }
1512
1513    fn try_into_sort_preserving_merge_physical_plan(
1514        &self,
1515        sort: &protobuf::SortPreservingMergeExecNode,
1516        ctx: &SessionContext,
1517        runtime: &RuntimeEnv,
1518        extension_codec: &dyn PhysicalExtensionCodec,
1519    ) -> Result<Arc<dyn ExecutionPlan>> {
1520        let input = into_physical_plan(&sort.input, ctx, runtime, extension_codec)?;
1521        let exprs = sort
1522            .expr
1523            .iter()
1524            .map(|expr| {
1525                let expr = expr.expr_type.as_ref().ok_or_else(|| {
1526                    proto_error(format!(
1527                        "physical_plan::from_proto() Unexpected expr {self:?}"
1528                    ))
1529                })?;
1530                if let ExprType::Sort(sort_expr) = expr {
1531                    let expr = sort_expr
1532                        .expr
1533                        .as_ref()
1534                        .ok_or_else(|| {
1535                            proto_error(format!(
1536                            "physical_plan::from_proto() Unexpected sort expr {self:?}"
1537                        ))
1538                        })?
1539                        .as_ref();
1540                    Ok(PhysicalSortExpr {
1541                        expr: parse_physical_expr(
1542                            expr,
1543                            ctx,
1544                            input.schema().as_ref(),
1545                            extension_codec,
1546                        )?,
1547                        options: SortOptions {
1548                            descending: !sort_expr.asc,
1549                            nulls_first: sort_expr.nulls_first,
1550                        },
1551                    })
1552                } else {
1553                    internal_err!("physical_plan::from_proto() {self:?}")
1554                }
1555            })
1556            .collect::<Result<Vec<_>>>()?;
1557        let Some(ordering) = LexOrdering::new(exprs) else {
1558            return internal_err!("SortExec requires an ordering");
1559        };
1560        let fetch = (sort.fetch >= 0).then_some(sort.fetch as _);
1561        Ok(Arc::new(
1562            SortPreservingMergeExec::new(ordering, input).with_fetch(fetch),
1563        ))
1564    }
1565
1566    fn try_into_extension_physical_plan(
1567        &self,
1568        extension: &protobuf::PhysicalExtensionNode,
1569        ctx: &SessionContext,
1570        runtime: &RuntimeEnv,
1571        extension_codec: &dyn PhysicalExtensionCodec,
1572    ) -> Result<Arc<dyn ExecutionPlan>> {
1573        let inputs: Vec<Arc<dyn ExecutionPlan>> = extension
1574            .inputs
1575            .iter()
1576            .map(|i| i.try_into_physical_plan(ctx, runtime, extension_codec))
1577            .collect::<Result<_>>()?;
1578
1579        let extension_node =
1580            extension_codec.try_decode(extension.node.as_slice(), &inputs, ctx)?;
1581
1582        Ok(extension_node)
1583    }
1584
1585    fn try_into_nested_loop_join_physical_plan(
1586        &self,
1587        join: &protobuf::NestedLoopJoinExecNode,
1588        ctx: &SessionContext,
1589        runtime: &RuntimeEnv,
1590        extension_codec: &dyn PhysicalExtensionCodec,
1591    ) -> Result<Arc<dyn ExecutionPlan>> {
1592        let left: Arc<dyn ExecutionPlan> =
1593            into_physical_plan(&join.left, ctx, runtime, extension_codec)?;
1594        let right: Arc<dyn ExecutionPlan> =
1595            into_physical_plan(&join.right, ctx, runtime, extension_codec)?;
1596        let join_type = protobuf::JoinType::try_from(join.join_type).map_err(|_| {
1597            proto_error(format!(
1598                "Received a NestedLoopJoinExecNode message with unknown JoinType {}",
1599                join.join_type
1600            ))
1601        })?;
1602        let filter = join
1603                    .filter
1604                    .as_ref()
1605                    .map(|f| {
1606                        let schema = f
1607                            .schema
1608                            .as_ref()
1609                            .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1610                            .try_into()?;
1611
1612                        let expression = parse_physical_expr(
1613                            f.expression.as_ref().ok_or_else(|| {
1614                                proto_error("Unexpected empty filter expression")
1615                            })?,
1616                            ctx, &schema,
1617                            extension_codec,
1618                        )?;
1619                        let column_indices = f.column_indices
1620                            .iter()
1621                            .map(|i| {
1622                                let side = protobuf::JoinSide::try_from(i.side)
1623                                    .map_err(|_| proto_error(format!(
1624                                        "Received a NestedLoopJoinExecNode message with JoinSide in Filter {}",
1625                                        i.side))
1626                                    )?;
1627
1628                                Ok(ColumnIndex {
1629                                    index: i.index as usize,
1630                                    side: side.into(),
1631                                })
1632                            })
1633                            .collect::<Result<Vec<_>>>()?;
1634
1635                        Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1636                    })
1637                    .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1638
1639        let projection = if !join.projection.is_empty() {
1640            Some(
1641                join.projection
1642                    .iter()
1643                    .map(|i| *i as usize)
1644                    .collect::<Vec<_>>(),
1645            )
1646        } else {
1647            None
1648        };
1649
1650        Ok(Arc::new(NestedLoopJoinExec::try_new(
1651            left,
1652            right,
1653            filter,
1654            &join_type.into(),
1655            projection,
1656        )?))
1657    }
1658
1659    fn try_into_analyze_physical_plan(
1660        &self,
1661        analyze: &protobuf::AnalyzeExecNode,
1662        ctx: &SessionContext,
1663        runtime: &RuntimeEnv,
1664        extension_codec: &dyn PhysicalExtensionCodec,
1665    ) -> Result<Arc<dyn ExecutionPlan>> {
1666        let input: Arc<dyn ExecutionPlan> =
1667            into_physical_plan(&analyze.input, ctx, runtime, extension_codec)?;
1668        Ok(Arc::new(AnalyzeExec::new(
1669            analyze.verbose,
1670            analyze.show_statistics,
1671            input,
1672            Arc::new(convert_required!(analyze.schema)?),
1673        )))
1674    }
1675
1676    fn try_into_json_sink_physical_plan(
1677        &self,
1678        sink: &protobuf::JsonSinkExecNode,
1679        ctx: &SessionContext,
1680        runtime: &RuntimeEnv,
1681        extension_codec: &dyn PhysicalExtensionCodec,
1682    ) -> Result<Arc<dyn ExecutionPlan>> {
1683        let input = into_physical_plan(&sink.input, ctx, runtime, extension_codec)?;
1684
1685        let data_sink: JsonSink = sink
1686            .sink
1687            .as_ref()
1688            .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1689            .try_into()?;
1690        let sink_schema = input.schema();
1691        let sort_order = sink
1692            .sort_order
1693            .as_ref()
1694            .map(|collection| {
1695                parse_physical_sort_exprs(
1696                    &collection.physical_sort_expr_nodes,
1697                    ctx,
1698                    &sink_schema,
1699                    extension_codec,
1700                )
1701                .map(|sort_exprs| {
1702                    LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1703                })
1704            })
1705            .transpose()?
1706            .flatten();
1707        Ok(Arc::new(DataSinkExec::new(
1708            input,
1709            Arc::new(data_sink),
1710            sort_order,
1711        )))
1712    }
1713
1714    fn try_into_csv_sink_physical_plan(
1715        &self,
1716        sink: &protobuf::CsvSinkExecNode,
1717        ctx: &SessionContext,
1718        runtime: &RuntimeEnv,
1719        extension_codec: &dyn PhysicalExtensionCodec,
1720    ) -> Result<Arc<dyn ExecutionPlan>> {
1721        let input = into_physical_plan(&sink.input, ctx, runtime, extension_codec)?;
1722
1723        let data_sink: CsvSink = sink
1724            .sink
1725            .as_ref()
1726            .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1727            .try_into()?;
1728        let sink_schema = input.schema();
1729        let sort_order = sink
1730            .sort_order
1731            .as_ref()
1732            .map(|collection| {
1733                parse_physical_sort_exprs(
1734                    &collection.physical_sort_expr_nodes,
1735                    ctx,
1736                    &sink_schema,
1737                    extension_codec,
1738                )
1739                .map(|sort_exprs| {
1740                    LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1741                })
1742            })
1743            .transpose()?
1744            .flatten();
1745        Ok(Arc::new(DataSinkExec::new(
1746            input,
1747            Arc::new(data_sink),
1748            sort_order,
1749        )))
1750    }
1751
1752    fn try_into_parquet_sink_physical_plan(
1753        &self,
1754        sink: &protobuf::ParquetSinkExecNode,
1755        ctx: &SessionContext,
1756        runtime: &RuntimeEnv,
1757        extension_codec: &dyn PhysicalExtensionCodec,
1758    ) -> Result<Arc<dyn ExecutionPlan>> {
1759        #[cfg(feature = "parquet")]
1760        {
1761            let input = into_physical_plan(&sink.input, ctx, runtime, extension_codec)?;
1762
1763            let data_sink: ParquetSink = sink
1764                .sink
1765                .as_ref()
1766                .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1767                .try_into()?;
1768            let sink_schema = input.schema();
1769            let sort_order = sink
1770                .sort_order
1771                .as_ref()
1772                .map(|collection| {
1773                    parse_physical_sort_exprs(
1774                        &collection.physical_sort_expr_nodes,
1775                        ctx,
1776                        &sink_schema,
1777                        extension_codec,
1778                    )
1779                    .map(|sort_exprs| {
1780                        LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1781                    })
1782                })
1783                .transpose()?
1784                .flatten();
1785            Ok(Arc::new(DataSinkExec::new(
1786                input,
1787                Arc::new(data_sink),
1788                sort_order,
1789            )))
1790        }
1791        #[cfg(not(feature = "parquet"))]
1792        panic!("Trying to use ParquetSink without `parquet` feature enabled");
1793    }
1794
1795    fn try_into_unnest_physical_plan(
1796        &self,
1797        unnest: &protobuf::UnnestExecNode,
1798        ctx: &SessionContext,
1799        runtime: &RuntimeEnv,
1800        extension_codec: &dyn PhysicalExtensionCodec,
1801    ) -> Result<Arc<dyn ExecutionPlan>> {
1802        let input = into_physical_plan(&unnest.input, ctx, runtime, extension_codec)?;
1803
1804        Ok(Arc::new(UnnestExec::new(
1805            input,
1806            unnest
1807                .list_type_columns
1808                .iter()
1809                .map(|c| ListUnnest {
1810                    index_in_input_schema: c.index_in_input_schema as _,
1811                    depth: c.depth as _,
1812                })
1813                .collect(),
1814            unnest.struct_type_columns.iter().map(|c| *c as _).collect(),
1815            Arc::new(convert_required!(unnest.schema)?),
1816            into_required!(unnest.options)?,
1817        )))
1818    }
1819
1820    fn generate_series_name_to_str(name: protobuf::GenerateSeriesName) -> &'static str {
1821        match name {
1822            protobuf::GenerateSeriesName::GsGenerateSeries => "generate_series",
1823            protobuf::GenerateSeriesName::GsRange => "range",
1824        }
1825    }
1826    fn try_into_sort_join(
1827        &self,
1828        sort_join: &SortMergeJoinExecNode,
1829        ctx: &SessionContext,
1830        runtime: &RuntimeEnv,
1831        extension_codec: &dyn PhysicalExtensionCodec,
1832    ) -> Result<Arc<dyn ExecutionPlan>> {
1833        let left = into_physical_plan(&sort_join.left, ctx, runtime, extension_codec)?;
1834        let left_schema = left.schema();
1835        let right = into_physical_plan(&sort_join.right, ctx, runtime, extension_codec)?;
1836        let right_schema = right.schema();
1837
1838        let filter = sort_join
1839            .filter
1840            .as_ref()
1841            .map(|f| {
1842                let schema = f
1843                    .schema
1844                    .as_ref()
1845                    .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1846                    .try_into()?;
1847
1848                let expression = parse_physical_expr(
1849                    f.expression.as_ref().ok_or_else(|| {
1850                        proto_error("Unexpected empty filter expression")
1851                    })?,
1852                    ctx,
1853                    &schema,
1854                    extension_codec,
1855                )?;
1856                let column_indices = f
1857                    .column_indices
1858                    .iter()
1859                    .map(|i| {
1860                        let side =
1861                            protobuf::JoinSide::try_from(i.side).map_err(|_| {
1862                                proto_error(format!(
1863                                    "Received a SortMergeJoinExecNode message with JoinSide in Filter {}",
1864                                    i.side
1865                                ))
1866                            })?;
1867
1868                        Ok(ColumnIndex {
1869                            index: i.index as usize,
1870                            side: side.into(),
1871                        })
1872                    })
1873                    .collect::<Result<Vec<_>>>()?;
1874
1875                Ok(JoinFilter::new(
1876                    expression,
1877                    column_indices,
1878                    Arc::new(schema),
1879                ))
1880            })
1881            .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1882
1883        let join_type =
1884            protobuf::JoinType::try_from(sort_join.join_type).map_err(|_| {
1885                proto_error(format!(
1886                    "Received a SortMergeJoinExecNode message with unknown JoinType {}",
1887                    sort_join.join_type
1888                ))
1889            })?;
1890
1891        let null_equality = protobuf::NullEquality::try_from(sort_join.null_equality)
1892            .map_err(|_| {
1893                proto_error(format!(
1894                    "Received a SortMergeJoinExecNode message with unknown NullEquality {}",
1895                    sort_join.null_equality
1896                ))
1897            })?;
1898
1899        let sort_options = sort_join
1900            .sort_options
1901            .iter()
1902            .map(|e| SortOptions {
1903                descending: !e.asc,
1904                nulls_first: e.nulls_first,
1905            })
1906            .collect();
1907        let on = sort_join
1908            .on
1909            .iter()
1910            .map(|col| {
1911                let left = parse_physical_expr(
1912                    &col.left.clone().unwrap(),
1913                    ctx,
1914                    left_schema.as_ref(),
1915                    extension_codec,
1916                )?;
1917                let right = parse_physical_expr(
1918                    &col.right.clone().unwrap(),
1919                    ctx,
1920                    right_schema.as_ref(),
1921                    extension_codec,
1922                )?;
1923                Ok((left, right))
1924            })
1925            .collect::<Result<_>>()?;
1926
1927        Ok(Arc::new(SortMergeJoinExec::try_new(
1928            left,
1929            right,
1930            on,
1931            filter,
1932            join_type.into(),
1933            sort_options,
1934            null_equality.into(),
1935        )?))
1936    }
1937
1938    fn try_into_generate_series_physical_plan(
1939        &self,
1940        generate_series: &protobuf::GenerateSeriesNode,
1941    ) -> Result<Arc<dyn ExecutionPlan>> {
1942        let schema: SchemaRef = Arc::new(convert_required!(generate_series.schema)?);
1943
1944        let args = match &generate_series.args {
1945            Some(protobuf::generate_series_node::Args::ContainsNull(args)) => {
1946                GenSeriesArgs::ContainsNull {
1947                    name: Self::generate_series_name_to_str(args.name()),
1948                }
1949            }
1950            Some(protobuf::generate_series_node::Args::Int64Args(args)) => {
1951                GenSeriesArgs::Int64Args {
1952                    start: args.start,
1953                    end: args.end,
1954                    step: args.step,
1955                    include_end: args.include_end,
1956                    name: Self::generate_series_name_to_str(args.name()),
1957                }
1958            }
1959            Some(protobuf::generate_series_node::Args::TimestampArgs(args)) => {
1960                let step_proto = args.step.as_ref().ok_or_else(|| {
1961                    DataFusionError::Internal("Missing step in TimestampArgs".to_string())
1962                })?;
1963                let step = IntervalMonthDayNanoType::make_value(
1964                    step_proto.months,
1965                    step_proto.days,
1966                    step_proto.nanos,
1967                );
1968                GenSeriesArgs::TimestampArgs {
1969                    start: args.start,
1970                    end: args.end,
1971                    step,
1972                    tz: args.tz.as_ref().map(|s| Arc::from(s.as_str())),
1973                    include_end: args.include_end,
1974                    name: Self::generate_series_name_to_str(args.name()),
1975                }
1976            }
1977            Some(protobuf::generate_series_node::Args::DateArgs(args)) => {
1978                let step_proto = args.step.as_ref().ok_or_else(|| {
1979                    DataFusionError::Internal("Missing step in DateArgs".to_string())
1980                })?;
1981                let step = IntervalMonthDayNanoType::make_value(
1982                    step_proto.months,
1983                    step_proto.days,
1984                    step_proto.nanos,
1985                );
1986                GenSeriesArgs::DateArgs {
1987                    start: args.start,
1988                    end: args.end,
1989                    step,
1990                    include_end: args.include_end,
1991                    name: Self::generate_series_name_to_str(args.name()),
1992                }
1993            }
1994            None => return internal_err!("Missing args in GenerateSeriesNode"),
1995        };
1996
1997        let table = GenerateSeriesTable::new(Arc::clone(&schema), args);
1998        let generator = table.as_generator(generate_series.target_batch_size as usize)?;
1999
2000        Ok(Arc::new(LazyMemoryExec::try_new(schema, vec![generator])?))
2001    }
2002
2003    fn try_into_cooperative_physical_plan(
2004        &self,
2005        field_stream: &protobuf::CooperativeExecNode,
2006        ctx: &SessionContext,
2007        runtime: &RuntimeEnv,
2008        extension_codec: &dyn PhysicalExtensionCodec,
2009    ) -> Result<Arc<dyn ExecutionPlan>> {
2010        let input =
2011            into_physical_plan(&field_stream.input, ctx, runtime, extension_codec)?;
2012        Ok(Arc::new(CooperativeExec::new(input)))
2013    }
2014
2015    fn try_from_explain_exec(
2016        exec: &ExplainExec,
2017        _extension_codec: &dyn PhysicalExtensionCodec,
2018    ) -> Result<Self> {
2019        Ok(protobuf::PhysicalPlanNode {
2020            physical_plan_type: Some(PhysicalPlanType::Explain(
2021                protobuf::ExplainExecNode {
2022                    schema: Some(exec.schema().as_ref().try_into()?),
2023                    stringified_plans: exec
2024                        .stringified_plans()
2025                        .iter()
2026                        .map(|plan| plan.into())
2027                        .collect(),
2028                    verbose: exec.verbose(),
2029                },
2030            )),
2031        })
2032    }
2033
2034    fn try_from_projection_exec(
2035        exec: &ProjectionExec,
2036        extension_codec: &dyn PhysicalExtensionCodec,
2037    ) -> Result<Self> {
2038        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2039            exec.input().to_owned(),
2040            extension_codec,
2041        )?;
2042        let expr = exec
2043            .expr()
2044            .iter()
2045            .map(|proj_expr| serialize_physical_expr(&proj_expr.expr, extension_codec))
2046            .collect::<Result<Vec<_>>>()?;
2047        let expr_name = exec
2048            .expr()
2049            .iter()
2050            .map(|proj_expr| proj_expr.alias.clone())
2051            .collect();
2052        Ok(protobuf::PhysicalPlanNode {
2053            physical_plan_type: Some(PhysicalPlanType::Projection(Box::new(
2054                protobuf::ProjectionExecNode {
2055                    input: Some(Box::new(input)),
2056                    expr,
2057                    expr_name,
2058                },
2059            ))),
2060        })
2061    }
2062
2063    fn try_from_analyze_exec(
2064        exec: &AnalyzeExec,
2065        extension_codec: &dyn PhysicalExtensionCodec,
2066    ) -> Result<Self> {
2067        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2068            exec.input().to_owned(),
2069            extension_codec,
2070        )?;
2071        Ok(protobuf::PhysicalPlanNode {
2072            physical_plan_type: Some(PhysicalPlanType::Analyze(Box::new(
2073                protobuf::AnalyzeExecNode {
2074                    verbose: exec.verbose(),
2075                    show_statistics: exec.show_statistics(),
2076                    input: Some(Box::new(input)),
2077                    schema: Some(exec.schema().as_ref().try_into()?),
2078                },
2079            ))),
2080        })
2081    }
2082
2083    fn try_from_filter_exec(
2084        exec: &FilterExec,
2085        extension_codec: &dyn PhysicalExtensionCodec,
2086    ) -> Result<Self> {
2087        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2088            exec.input().to_owned(),
2089            extension_codec,
2090        )?;
2091        Ok(protobuf::PhysicalPlanNode {
2092            physical_plan_type: Some(PhysicalPlanType::Filter(Box::new(
2093                protobuf::FilterExecNode {
2094                    input: Some(Box::new(input)),
2095                    expr: Some(serialize_physical_expr(
2096                        exec.predicate(),
2097                        extension_codec,
2098                    )?),
2099                    default_filter_selectivity: exec.default_selectivity() as u32,
2100                    projection: exec.projection().as_ref().map_or_else(Vec::new, |v| {
2101                        v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2102                    }),
2103                },
2104            ))),
2105        })
2106    }
2107
2108    fn try_from_global_limit_exec(
2109        limit: &GlobalLimitExec,
2110        extension_codec: &dyn PhysicalExtensionCodec,
2111    ) -> Result<Self> {
2112        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2113            limit.input().to_owned(),
2114            extension_codec,
2115        )?;
2116
2117        Ok(protobuf::PhysicalPlanNode {
2118            physical_plan_type: Some(PhysicalPlanType::GlobalLimit(Box::new(
2119                protobuf::GlobalLimitExecNode {
2120                    input: Some(Box::new(input)),
2121                    skip: limit.skip() as u32,
2122                    fetch: match limit.fetch() {
2123                        Some(n) => n as i64,
2124                        _ => -1, // no limit
2125                    },
2126                },
2127            ))),
2128        })
2129    }
2130
2131    fn try_from_local_limit_exec(
2132        limit: &LocalLimitExec,
2133        extension_codec: &dyn PhysicalExtensionCodec,
2134    ) -> Result<Self> {
2135        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2136            limit.input().to_owned(),
2137            extension_codec,
2138        )?;
2139        Ok(protobuf::PhysicalPlanNode {
2140            physical_plan_type: Some(PhysicalPlanType::LocalLimit(Box::new(
2141                protobuf::LocalLimitExecNode {
2142                    input: Some(Box::new(input)),
2143                    fetch: limit.fetch() as u32,
2144                },
2145            ))),
2146        })
2147    }
2148
2149    fn try_from_hash_join_exec(
2150        exec: &HashJoinExec,
2151        extension_codec: &dyn PhysicalExtensionCodec,
2152    ) -> Result<Self> {
2153        let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2154            exec.left().to_owned(),
2155            extension_codec,
2156        )?;
2157        let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2158            exec.right().to_owned(),
2159            extension_codec,
2160        )?;
2161        let on: Vec<protobuf::JoinOn> = exec
2162            .on()
2163            .iter()
2164            .map(|tuple| {
2165                let l = serialize_physical_expr(&tuple.0, extension_codec)?;
2166                let r = serialize_physical_expr(&tuple.1, extension_codec)?;
2167                Ok::<_, DataFusionError>(protobuf::JoinOn {
2168                    left: Some(l),
2169                    right: Some(r),
2170                })
2171            })
2172            .collect::<Result<_>>()?;
2173        let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2174        let null_equality: protobuf::NullEquality = exec.null_equality().into();
2175        let filter = exec
2176            .filter()
2177            .as_ref()
2178            .map(|f| {
2179                let expression =
2180                    serialize_physical_expr(f.expression(), extension_codec)?;
2181                let column_indices = f
2182                    .column_indices()
2183                    .iter()
2184                    .map(|i| {
2185                        let side: protobuf::JoinSide = i.side.to_owned().into();
2186                        protobuf::ColumnIndex {
2187                            index: i.index as u32,
2188                            side: side.into(),
2189                        }
2190                    })
2191                    .collect();
2192                let schema = f.schema().as_ref().try_into()?;
2193                Ok(protobuf::JoinFilter {
2194                    expression: Some(expression),
2195                    column_indices,
2196                    schema: Some(schema),
2197                })
2198            })
2199            .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2200
2201        let partition_mode = match exec.partition_mode() {
2202            PartitionMode::CollectLeft => protobuf::PartitionMode::CollectLeft,
2203            PartitionMode::Partitioned => protobuf::PartitionMode::Partitioned,
2204            PartitionMode::Auto => protobuf::PartitionMode::Auto,
2205        };
2206
2207        Ok(protobuf::PhysicalPlanNode {
2208            physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new(
2209                protobuf::HashJoinExecNode {
2210                    left: Some(Box::new(left)),
2211                    right: Some(Box::new(right)),
2212                    on,
2213                    join_type: join_type.into(),
2214                    partition_mode: partition_mode.into(),
2215                    null_equality: null_equality.into(),
2216                    filter,
2217                    projection: exec.projection.as_ref().map_or_else(Vec::new, |v| {
2218                        v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2219                    }),
2220                },
2221            ))),
2222        })
2223    }
2224
2225    fn try_from_symmetric_hash_join_exec(
2226        exec: &SymmetricHashJoinExec,
2227        extension_codec: &dyn PhysicalExtensionCodec,
2228    ) -> Result<Self> {
2229        let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2230            exec.left().to_owned(),
2231            extension_codec,
2232        )?;
2233        let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2234            exec.right().to_owned(),
2235            extension_codec,
2236        )?;
2237        let on = exec
2238            .on()
2239            .iter()
2240            .map(|tuple| {
2241                let l = serialize_physical_expr(&tuple.0, extension_codec)?;
2242                let r = serialize_physical_expr(&tuple.1, extension_codec)?;
2243                Ok::<_, DataFusionError>(protobuf::JoinOn {
2244                    left: Some(l),
2245                    right: Some(r),
2246                })
2247            })
2248            .collect::<Result<_>>()?;
2249        let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2250        let null_equality: protobuf::NullEquality = exec.null_equality().into();
2251        let filter = exec
2252            .filter()
2253            .as_ref()
2254            .map(|f| {
2255                let expression =
2256                    serialize_physical_expr(f.expression(), extension_codec)?;
2257                let column_indices = f
2258                    .column_indices()
2259                    .iter()
2260                    .map(|i| {
2261                        let side: protobuf::JoinSide = i.side.to_owned().into();
2262                        protobuf::ColumnIndex {
2263                            index: i.index as u32,
2264                            side: side.into(),
2265                        }
2266                    })
2267                    .collect();
2268                let schema = f.schema().as_ref().try_into()?;
2269                Ok(protobuf::JoinFilter {
2270                    expression: Some(expression),
2271                    column_indices,
2272                    schema: Some(schema),
2273                })
2274            })
2275            .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2276
2277        let partition_mode = match exec.partition_mode() {
2278            StreamJoinPartitionMode::SinglePartition => {
2279                protobuf::StreamPartitionMode::SinglePartition
2280            }
2281            StreamJoinPartitionMode::Partitioned => {
2282                protobuf::StreamPartitionMode::PartitionedExec
2283            }
2284        };
2285
2286        let left_sort_exprs = exec
2287            .left_sort_exprs()
2288            .map(|exprs| {
2289                exprs
2290                    .iter()
2291                    .map(|expr| {
2292                        Ok(protobuf::PhysicalSortExprNode {
2293                            expr: Some(Box::new(serialize_physical_expr(
2294                                &expr.expr,
2295                                extension_codec,
2296                            )?)),
2297                            asc: !expr.options.descending,
2298                            nulls_first: expr.options.nulls_first,
2299                        })
2300                    })
2301                    .collect::<Result<Vec<_>>>()
2302            })
2303            .transpose()?
2304            .unwrap_or(vec![]);
2305
2306        let right_sort_exprs = exec
2307            .right_sort_exprs()
2308            .map(|exprs| {
2309                exprs
2310                    .iter()
2311                    .map(|expr| {
2312                        Ok(protobuf::PhysicalSortExprNode {
2313                            expr: Some(Box::new(serialize_physical_expr(
2314                                &expr.expr,
2315                                extension_codec,
2316                            )?)),
2317                            asc: !expr.options.descending,
2318                            nulls_first: expr.options.nulls_first,
2319                        })
2320                    })
2321                    .collect::<Result<Vec<_>>>()
2322            })
2323            .transpose()?
2324            .unwrap_or(vec![]);
2325
2326        Ok(protobuf::PhysicalPlanNode {
2327            physical_plan_type: Some(PhysicalPlanType::SymmetricHashJoin(Box::new(
2328                protobuf::SymmetricHashJoinExecNode {
2329                    left: Some(Box::new(left)),
2330                    right: Some(Box::new(right)),
2331                    on,
2332                    join_type: join_type.into(),
2333                    partition_mode: partition_mode.into(),
2334                    null_equality: null_equality.into(),
2335                    left_sort_exprs,
2336                    right_sort_exprs,
2337                    filter,
2338                },
2339            ))),
2340        })
2341    }
2342
2343    fn try_from_sort_merge_join_exec(
2344        exec: &SortMergeJoinExec,
2345        extension_codec: &dyn PhysicalExtensionCodec,
2346    ) -> Result<Self> {
2347        let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2348            exec.left().to_owned(),
2349            extension_codec,
2350        )?;
2351        let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2352            exec.right().to_owned(),
2353            extension_codec,
2354        )?;
2355        let on = exec
2356            .on()
2357            .iter()
2358            .map(|tuple| {
2359                let l = serialize_physical_expr(&tuple.0, extension_codec)?;
2360                let r = serialize_physical_expr(&tuple.1, extension_codec)?;
2361                Ok::<_, DataFusionError>(protobuf::JoinOn {
2362                    left: Some(l),
2363                    right: Some(r),
2364                })
2365            })
2366            .collect::<Result<_>>()?;
2367        let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2368        let null_equality: protobuf::NullEquality = exec.null_equality().into();
2369        let filter = exec
2370            .filter()
2371            .as_ref()
2372            .map(|f| {
2373                let expression =
2374                    serialize_physical_expr(f.expression(), extension_codec)?;
2375                let column_indices = f
2376                    .column_indices()
2377                    .iter()
2378                    .map(|i| {
2379                        let side: protobuf::JoinSide = i.side.to_owned().into();
2380                        protobuf::ColumnIndex {
2381                            index: i.index as u32,
2382                            side: side.into(),
2383                        }
2384                    })
2385                    .collect();
2386                let schema = f.schema().as_ref().try_into()?;
2387                Ok(protobuf::JoinFilter {
2388                    expression: Some(expression),
2389                    column_indices,
2390                    schema: Some(schema),
2391                })
2392            })
2393            .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2394
2395        let sort_options = exec
2396            .sort_options()
2397            .iter()
2398            .map(
2399                |SortOptions {
2400                     descending,
2401                     nulls_first,
2402                 }| {
2403                    SortExprNode {
2404                        expr: None,
2405                        asc: !*descending,
2406                        nulls_first: *nulls_first,
2407                    }
2408                },
2409            )
2410            .collect();
2411
2412        Ok(protobuf::PhysicalPlanNode {
2413            physical_plan_type: Some(PhysicalPlanType::SortMergeJoin(Box::new(
2414                protobuf::SortMergeJoinExecNode {
2415                    left: Some(Box::new(left)),
2416                    right: Some(Box::new(right)),
2417                    on,
2418                    join_type: join_type.into(),
2419                    null_equality: null_equality.into(),
2420                    filter,
2421                    sort_options,
2422                },
2423            ))),
2424        })
2425    }
2426
2427    fn try_from_cross_join_exec(
2428        exec: &CrossJoinExec,
2429        extension_codec: &dyn PhysicalExtensionCodec,
2430    ) -> Result<Self> {
2431        let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2432            exec.left().to_owned(),
2433            extension_codec,
2434        )?;
2435        let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2436            exec.right().to_owned(),
2437            extension_codec,
2438        )?;
2439        Ok(protobuf::PhysicalPlanNode {
2440            physical_plan_type: Some(PhysicalPlanType::CrossJoin(Box::new(
2441                protobuf::CrossJoinExecNode {
2442                    left: Some(Box::new(left)),
2443                    right: Some(Box::new(right)),
2444                },
2445            ))),
2446        })
2447    }
2448
2449    fn try_from_aggregate_exec(
2450        exec: &AggregateExec,
2451        extension_codec: &dyn PhysicalExtensionCodec,
2452    ) -> Result<Self> {
2453        let groups: Vec<bool> = exec
2454            .group_expr()
2455            .groups()
2456            .iter()
2457            .flatten()
2458            .copied()
2459            .collect();
2460
2461        let group_names = exec
2462            .group_expr()
2463            .expr()
2464            .iter()
2465            .map(|expr| expr.1.to_owned())
2466            .collect();
2467
2468        let filter = exec
2469            .filter_expr()
2470            .iter()
2471            .map(|expr| serialize_maybe_filter(expr.to_owned(), extension_codec))
2472            .collect::<Result<Vec<_>>>()?;
2473
2474        let agg = exec
2475            .aggr_expr()
2476            .iter()
2477            .map(|expr| serialize_physical_aggr_expr(expr.to_owned(), extension_codec))
2478            .collect::<Result<Vec<_>>>()?;
2479
2480        let agg_names = exec
2481            .aggr_expr()
2482            .iter()
2483            .map(|expr| expr.name().to_string())
2484            .collect::<Vec<_>>();
2485
2486        let agg_mode = match exec.mode() {
2487            AggregateMode::Partial => protobuf::AggregateMode::Partial,
2488            AggregateMode::Final => protobuf::AggregateMode::Final,
2489            AggregateMode::FinalPartitioned => protobuf::AggregateMode::FinalPartitioned,
2490            AggregateMode::Single => protobuf::AggregateMode::Single,
2491            AggregateMode::SinglePartitioned => {
2492                protobuf::AggregateMode::SinglePartitioned
2493            }
2494        };
2495        let input_schema = exec.input_schema();
2496        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2497            exec.input().to_owned(),
2498            extension_codec,
2499        )?;
2500
2501        let null_expr = exec
2502            .group_expr()
2503            .null_expr()
2504            .iter()
2505            .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
2506            .collect::<Result<Vec<_>>>()?;
2507
2508        let group_expr = exec
2509            .group_expr()
2510            .expr()
2511            .iter()
2512            .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
2513            .collect::<Result<Vec<_>>>()?;
2514
2515        let limit = exec.limit().map(|value| protobuf::AggLimit {
2516            limit: value as u64,
2517        });
2518
2519        Ok(protobuf::PhysicalPlanNode {
2520            physical_plan_type: Some(PhysicalPlanType::Aggregate(Box::new(
2521                protobuf::AggregateExecNode {
2522                    group_expr,
2523                    group_expr_name: group_names,
2524                    aggr_expr: agg,
2525                    filter_expr: filter,
2526                    aggr_expr_name: agg_names,
2527                    mode: agg_mode as i32,
2528                    input: Some(Box::new(input)),
2529                    input_schema: Some(input_schema.as_ref().try_into()?),
2530                    null_expr,
2531                    groups,
2532                    limit,
2533                },
2534            ))),
2535        })
2536    }
2537
2538    fn try_from_empty_exec(
2539        empty: &EmptyExec,
2540        _extension_codec: &dyn PhysicalExtensionCodec,
2541    ) -> Result<Self> {
2542        let schema = empty.schema().as_ref().try_into()?;
2543        Ok(protobuf::PhysicalPlanNode {
2544            physical_plan_type: Some(PhysicalPlanType::Empty(protobuf::EmptyExecNode {
2545                schema: Some(schema),
2546            })),
2547        })
2548    }
2549
2550    fn try_from_placeholder_row_exec(
2551        empty: &PlaceholderRowExec,
2552        _extension_codec: &dyn PhysicalExtensionCodec,
2553    ) -> Result<Self> {
2554        let schema = empty.schema().as_ref().try_into()?;
2555        Ok(protobuf::PhysicalPlanNode {
2556            physical_plan_type: Some(PhysicalPlanType::PlaceholderRow(
2557                protobuf::PlaceholderRowExecNode {
2558                    schema: Some(schema),
2559                },
2560            )),
2561        })
2562    }
2563
2564    fn try_from_coalesce_batches_exec(
2565        coalesce_batches: &CoalesceBatchesExec,
2566        extension_codec: &dyn PhysicalExtensionCodec,
2567    ) -> Result<Self> {
2568        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2569            coalesce_batches.input().to_owned(),
2570            extension_codec,
2571        )?;
2572        Ok(protobuf::PhysicalPlanNode {
2573            physical_plan_type: Some(PhysicalPlanType::CoalesceBatches(Box::new(
2574                protobuf::CoalesceBatchesExecNode {
2575                    input: Some(Box::new(input)),
2576                    target_batch_size: coalesce_batches.target_batch_size() as u32,
2577                    fetch: coalesce_batches.fetch().map(|n| n as u32),
2578                },
2579            ))),
2580        })
2581    }
2582
2583    fn try_from_data_source_exec(
2584        data_source_exec: &DataSourceExec,
2585        extension_codec: &dyn PhysicalExtensionCodec,
2586    ) -> Result<Option<Self>> {
2587        let data_source = data_source_exec.data_source();
2588        if let Some(maybe_csv) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2589            let source = maybe_csv.file_source();
2590            if let Some(csv_config) = source.as_any().downcast_ref::<CsvSource>() {
2591                return Ok(Some(protobuf::PhysicalPlanNode {
2592                    physical_plan_type: Some(PhysicalPlanType::CsvScan(
2593                        protobuf::CsvScanExecNode {
2594                            base_conf: Some(serialize_file_scan_config(
2595                                maybe_csv,
2596                                extension_codec,
2597                            )?),
2598                            has_header: csv_config.has_header(),
2599                            delimiter: byte_to_string(
2600                                csv_config.delimiter(),
2601                                "delimiter",
2602                            )?,
2603                            quote: byte_to_string(csv_config.quote(), "quote")?,
2604                            optional_escape: if let Some(escape) = csv_config.escape() {
2605                                Some(
2606                                    protobuf::csv_scan_exec_node::OptionalEscape::Escape(
2607                                        byte_to_string(escape, "escape")?,
2608                                    ),
2609                                )
2610                            } else {
2611                                None
2612                            },
2613                            optional_comment: if let Some(comment) = csv_config.comment()
2614                            {
2615                                Some(protobuf::csv_scan_exec_node::OptionalComment::Comment(
2616                                        byte_to_string(comment, "comment")?,
2617                                    ))
2618                            } else {
2619                                None
2620                            },
2621                            newlines_in_values: maybe_csv.newlines_in_values(),
2622                        },
2623                    )),
2624                }));
2625            }
2626        }
2627
2628        if let Some(scan_conf) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2629            let source = scan_conf.file_source();
2630            if let Some(_json_source) = source.as_any().downcast_ref::<JsonSource>() {
2631                return Ok(Some(protobuf::PhysicalPlanNode {
2632                    physical_plan_type: Some(PhysicalPlanType::JsonScan(
2633                        protobuf::JsonScanExecNode {
2634                            base_conf: Some(serialize_file_scan_config(
2635                                scan_conf,
2636                                extension_codec,
2637                            )?),
2638                        },
2639                    )),
2640                }));
2641            }
2642        }
2643
2644        #[cfg(feature = "parquet")]
2645        if let Some((maybe_parquet, conf)) =
2646            data_source_exec.downcast_to_file_source::<ParquetSource>()
2647        {
2648            let predicate = conf
2649                .predicate()
2650                .map(|pred| serialize_physical_expr(pred, extension_codec))
2651                .transpose()?;
2652            return Ok(Some(protobuf::PhysicalPlanNode {
2653                physical_plan_type: Some(PhysicalPlanType::ParquetScan(
2654                    protobuf::ParquetScanExecNode {
2655                        base_conf: Some(serialize_file_scan_config(
2656                            maybe_parquet,
2657                            extension_codec,
2658                        )?),
2659                        predicate,
2660                        parquet_options: Some(conf.table_parquet_options().try_into()?),
2661                    },
2662                )),
2663            }));
2664        }
2665
2666        #[cfg(feature = "avro")]
2667        if let Some(maybe_avro) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2668            let source = maybe_avro.file_source();
2669            if source.as_any().downcast_ref::<AvroSource>().is_some() {
2670                return Ok(Some(protobuf::PhysicalPlanNode {
2671                    physical_plan_type: Some(PhysicalPlanType::AvroScan(
2672                        protobuf::AvroScanExecNode {
2673                            base_conf: Some(serialize_file_scan_config(
2674                                maybe_avro,
2675                                extension_codec,
2676                            )?),
2677                        },
2678                    )),
2679                }));
2680            }
2681        }
2682
2683        if let Some(source_conf) =
2684            data_source.as_any().downcast_ref::<MemorySourceConfig>()
2685        {
2686            let proto_partitions = source_conf
2687                .partitions()
2688                .iter()
2689                .map(|p| serialize_record_batches(p))
2690                .collect::<Result<Vec<_>>>()?;
2691
2692            let proto_schema: protobuf::Schema =
2693                source_conf.original_schema().as_ref().try_into()?;
2694
2695            let proto_projection = source_conf
2696                .projection()
2697                .as_ref()
2698                .map_or_else(Vec::new, |v| {
2699                    v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2700                });
2701
2702            let proto_sort_information = source_conf
2703                .sort_information()
2704                .iter()
2705                .map(|ordering| {
2706                    let sort_exprs = serialize_physical_sort_exprs(
2707                        ordering.to_owned(),
2708                        extension_codec,
2709                    )?;
2710                    Ok::<_, DataFusionError>(protobuf::PhysicalSortExprNodeCollection {
2711                        physical_sort_expr_nodes: sort_exprs,
2712                    })
2713                })
2714                .collect::<Result<Vec<_>, _>>()?;
2715
2716            return Ok(Some(protobuf::PhysicalPlanNode {
2717                physical_plan_type: Some(PhysicalPlanType::MemoryScan(
2718                    protobuf::MemoryScanExecNode {
2719                        partitions: proto_partitions,
2720                        schema: Some(proto_schema),
2721                        projection: proto_projection,
2722                        sort_information: proto_sort_information,
2723                        show_sizes: source_conf.show_sizes(),
2724                        fetch: source_conf.fetch().map(|f| f as u32),
2725                    },
2726                )),
2727            }));
2728        }
2729
2730        Ok(None)
2731    }
2732
2733    fn try_from_coalesce_partitions_exec(
2734        exec: &CoalescePartitionsExec,
2735        extension_codec: &dyn PhysicalExtensionCodec,
2736    ) -> Result<Self> {
2737        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2738            exec.input().to_owned(),
2739            extension_codec,
2740        )?;
2741        Ok(protobuf::PhysicalPlanNode {
2742            physical_plan_type: Some(PhysicalPlanType::Merge(Box::new(
2743                protobuf::CoalescePartitionsExecNode {
2744                    input: Some(Box::new(input)),
2745                    fetch: exec.fetch().map(|f| f as u32),
2746                },
2747            ))),
2748        })
2749    }
2750
2751    fn try_from_repartition_exec(
2752        exec: &RepartitionExec,
2753        extension_codec: &dyn PhysicalExtensionCodec,
2754    ) -> Result<Self> {
2755        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2756            exec.input().to_owned(),
2757            extension_codec,
2758        )?;
2759
2760        let pb_partitioning =
2761            serialize_partitioning(exec.partitioning(), extension_codec)?;
2762
2763        Ok(protobuf::PhysicalPlanNode {
2764            physical_plan_type: Some(PhysicalPlanType::Repartition(Box::new(
2765                protobuf::RepartitionExecNode {
2766                    input: Some(Box::new(input)),
2767                    partitioning: Some(pb_partitioning),
2768                },
2769            ))),
2770        })
2771    }
2772
2773    fn try_from_sort_exec(
2774        exec: &SortExec,
2775        extension_codec: &dyn PhysicalExtensionCodec,
2776    ) -> Result<Self> {
2777        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2778            exec.input().to_owned(),
2779            extension_codec,
2780        )?;
2781        let expr = exec
2782            .expr()
2783            .iter()
2784            .map(|expr| {
2785                let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
2786                    expr: Some(Box::new(serialize_physical_expr(
2787                        &expr.expr,
2788                        extension_codec,
2789                    )?)),
2790                    asc: !expr.options.descending,
2791                    nulls_first: expr.options.nulls_first,
2792                });
2793                Ok(protobuf::PhysicalExprNode {
2794                    expr_type: Some(ExprType::Sort(sort_expr)),
2795                })
2796            })
2797            .collect::<Result<Vec<_>>>()?;
2798        Ok(protobuf::PhysicalPlanNode {
2799            physical_plan_type: Some(PhysicalPlanType::Sort(Box::new(
2800                protobuf::SortExecNode {
2801                    input: Some(Box::new(input)),
2802                    expr,
2803                    fetch: match exec.fetch() {
2804                        Some(n) => n as i64,
2805                        _ => -1,
2806                    },
2807                    preserve_partitioning: exec.preserve_partitioning(),
2808                },
2809            ))),
2810        })
2811    }
2812
2813    fn try_from_union_exec(
2814        union: &UnionExec,
2815        extension_codec: &dyn PhysicalExtensionCodec,
2816    ) -> Result<Self> {
2817        let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
2818        for input in union.inputs() {
2819            inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
2820                input.to_owned(),
2821                extension_codec,
2822            )?);
2823        }
2824        Ok(protobuf::PhysicalPlanNode {
2825            physical_plan_type: Some(PhysicalPlanType::Union(protobuf::UnionExecNode {
2826                inputs,
2827            })),
2828        })
2829    }
2830
2831    fn try_from_interleave_exec(
2832        interleave: &InterleaveExec,
2833        extension_codec: &dyn PhysicalExtensionCodec,
2834    ) -> Result<Self> {
2835        let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
2836        for input in interleave.inputs() {
2837            inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
2838                input.to_owned(),
2839                extension_codec,
2840            )?);
2841        }
2842        Ok(protobuf::PhysicalPlanNode {
2843            physical_plan_type: Some(PhysicalPlanType::Interleave(
2844                protobuf::InterleaveExecNode { inputs },
2845            )),
2846        })
2847    }
2848
2849    fn try_from_sort_preserving_merge_exec(
2850        exec: &SortPreservingMergeExec,
2851        extension_codec: &dyn PhysicalExtensionCodec,
2852    ) -> Result<Self> {
2853        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2854            exec.input().to_owned(),
2855            extension_codec,
2856        )?;
2857        let expr = exec
2858            .expr()
2859            .iter()
2860            .map(|expr| {
2861                let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
2862                    expr: Some(Box::new(serialize_physical_expr(
2863                        &expr.expr,
2864                        extension_codec,
2865                    )?)),
2866                    asc: !expr.options.descending,
2867                    nulls_first: expr.options.nulls_first,
2868                });
2869                Ok(protobuf::PhysicalExprNode {
2870                    expr_type: Some(ExprType::Sort(sort_expr)),
2871                })
2872            })
2873            .collect::<Result<Vec<_>>>()?;
2874        Ok(protobuf::PhysicalPlanNode {
2875            physical_plan_type: Some(PhysicalPlanType::SortPreservingMerge(Box::new(
2876                protobuf::SortPreservingMergeExecNode {
2877                    input: Some(Box::new(input)),
2878                    expr,
2879                    fetch: exec.fetch().map(|f| f as i64).unwrap_or(-1),
2880                },
2881            ))),
2882        })
2883    }
2884
2885    fn try_from_nested_loop_join_exec(
2886        exec: &NestedLoopJoinExec,
2887        extension_codec: &dyn PhysicalExtensionCodec,
2888    ) -> Result<Self> {
2889        let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
2890            exec.left().to_owned(),
2891            extension_codec,
2892        )?;
2893        let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
2894            exec.right().to_owned(),
2895            extension_codec,
2896        )?;
2897
2898        let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2899        let filter = exec
2900            .filter()
2901            .as_ref()
2902            .map(|f| {
2903                let expression =
2904                    serialize_physical_expr(f.expression(), extension_codec)?;
2905                let column_indices = f
2906                    .column_indices()
2907                    .iter()
2908                    .map(|i| {
2909                        let side: protobuf::JoinSide = i.side.to_owned().into();
2910                        protobuf::ColumnIndex {
2911                            index: i.index as u32,
2912                            side: side.into(),
2913                        }
2914                    })
2915                    .collect();
2916                let schema = f.schema().as_ref().try_into()?;
2917                Ok(protobuf::JoinFilter {
2918                    expression: Some(expression),
2919                    column_indices,
2920                    schema: Some(schema),
2921                })
2922            })
2923            .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2924
2925        Ok(protobuf::PhysicalPlanNode {
2926            physical_plan_type: Some(PhysicalPlanType::NestedLoopJoin(Box::new(
2927                protobuf::NestedLoopJoinExecNode {
2928                    left: Some(Box::new(left)),
2929                    right: Some(Box::new(right)),
2930                    join_type: join_type.into(),
2931                    filter,
2932                    projection: exec.projection().map_or_else(Vec::new, |v| {
2933                        v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2934                    }),
2935                },
2936            ))),
2937        })
2938    }
2939
2940    fn try_from_window_agg_exec(
2941        exec: &WindowAggExec,
2942        extension_codec: &dyn PhysicalExtensionCodec,
2943    ) -> Result<Self> {
2944        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2945            exec.input().to_owned(),
2946            extension_codec,
2947        )?;
2948
2949        let window_expr = exec
2950            .window_expr()
2951            .iter()
2952            .map(|e| serialize_physical_window_expr(e, extension_codec))
2953            .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
2954
2955        let partition_keys = exec
2956            .partition_keys()
2957            .iter()
2958            .map(|e| serialize_physical_expr(e, extension_codec))
2959            .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
2960
2961        Ok(protobuf::PhysicalPlanNode {
2962            physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
2963                protobuf::WindowAggExecNode {
2964                    input: Some(Box::new(input)),
2965                    window_expr,
2966                    partition_keys,
2967                    input_order_mode: None,
2968                },
2969            ))),
2970        })
2971    }
2972
2973    fn try_from_bounded_window_agg_exec(
2974        exec: &BoundedWindowAggExec,
2975        extension_codec: &dyn PhysicalExtensionCodec,
2976    ) -> Result<Self> {
2977        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2978            exec.input().to_owned(),
2979            extension_codec,
2980        )?;
2981
2982        let window_expr = exec
2983            .window_expr()
2984            .iter()
2985            .map(|e| serialize_physical_window_expr(e, extension_codec))
2986            .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
2987
2988        let partition_keys = exec
2989            .partition_keys()
2990            .iter()
2991            .map(|e| serialize_physical_expr(e, extension_codec))
2992            .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
2993
2994        let input_order_mode = match &exec.input_order_mode {
2995            InputOrderMode::Linear => {
2996                window_agg_exec_node::InputOrderMode::Linear(protobuf::EmptyMessage {})
2997            }
2998            InputOrderMode::PartiallySorted(columns) => {
2999                window_agg_exec_node::InputOrderMode::PartiallySorted(
3000                    protobuf::PartiallySortedInputOrderMode {
3001                        columns: columns.iter().map(|c| *c as u64).collect(),
3002                    },
3003                )
3004            }
3005            InputOrderMode::Sorted => {
3006                window_agg_exec_node::InputOrderMode::Sorted(protobuf::EmptyMessage {})
3007            }
3008        };
3009
3010        Ok(protobuf::PhysicalPlanNode {
3011            physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
3012                protobuf::WindowAggExecNode {
3013                    input: Some(Box::new(input)),
3014                    window_expr,
3015                    partition_keys,
3016                    input_order_mode: Some(input_order_mode),
3017                },
3018            ))),
3019        })
3020    }
3021
3022    fn try_from_data_sink_exec(
3023        exec: &DataSinkExec,
3024        extension_codec: &dyn PhysicalExtensionCodec,
3025    ) -> Result<Option<Self>> {
3026        let input: protobuf::PhysicalPlanNode =
3027            protobuf::PhysicalPlanNode::try_from_physical_plan(
3028                exec.input().to_owned(),
3029                extension_codec,
3030            )?;
3031        let sort_order = match exec.sort_order() {
3032            Some(requirements) => {
3033                let expr = requirements
3034                    .iter()
3035                    .map(|requirement| {
3036                        let expr: PhysicalSortExpr = requirement.to_owned().into();
3037                        let sort_expr = protobuf::PhysicalSortExprNode {
3038                            expr: Some(Box::new(serialize_physical_expr(
3039                                &expr.expr,
3040                                extension_codec,
3041                            )?)),
3042                            asc: !expr.options.descending,
3043                            nulls_first: expr.options.nulls_first,
3044                        };
3045                        Ok(sort_expr)
3046                    })
3047                    .collect::<Result<Vec<_>>>()?;
3048                Some(protobuf::PhysicalSortExprNodeCollection {
3049                    physical_sort_expr_nodes: expr,
3050                })
3051            }
3052            None => None,
3053        };
3054
3055        if let Some(sink) = exec.sink().as_any().downcast_ref::<JsonSink>() {
3056            return Ok(Some(protobuf::PhysicalPlanNode {
3057                physical_plan_type: Some(PhysicalPlanType::JsonSink(Box::new(
3058                    protobuf::JsonSinkExecNode {
3059                        input: Some(Box::new(input)),
3060                        sink: Some(sink.try_into()?),
3061                        sink_schema: Some(exec.schema().as_ref().try_into()?),
3062                        sort_order,
3063                    },
3064                ))),
3065            }));
3066        }
3067
3068        if let Some(sink) = exec.sink().as_any().downcast_ref::<CsvSink>() {
3069            return Ok(Some(protobuf::PhysicalPlanNode {
3070                physical_plan_type: Some(PhysicalPlanType::CsvSink(Box::new(
3071                    protobuf::CsvSinkExecNode {
3072                        input: Some(Box::new(input)),
3073                        sink: Some(sink.try_into()?),
3074                        sink_schema: Some(exec.schema().as_ref().try_into()?),
3075                        sort_order,
3076                    },
3077                ))),
3078            }));
3079        }
3080
3081        #[cfg(feature = "parquet")]
3082        if let Some(sink) = exec.sink().as_any().downcast_ref::<ParquetSink>() {
3083            return Ok(Some(protobuf::PhysicalPlanNode {
3084                physical_plan_type: Some(PhysicalPlanType::ParquetSink(Box::new(
3085                    protobuf::ParquetSinkExecNode {
3086                        input: Some(Box::new(input)),
3087                        sink: Some(sink.try_into()?),
3088                        sink_schema: Some(exec.schema().as_ref().try_into()?),
3089                        sort_order,
3090                    },
3091                ))),
3092            }));
3093        }
3094
3095        // If unknown DataSink then let extension handle it
3096        Ok(None)
3097    }
3098
3099    fn try_from_unnest_exec(
3100        exec: &UnnestExec,
3101        extension_codec: &dyn PhysicalExtensionCodec,
3102    ) -> Result<Self> {
3103        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
3104            exec.input().to_owned(),
3105            extension_codec,
3106        )?;
3107
3108        Ok(protobuf::PhysicalPlanNode {
3109            physical_plan_type: Some(PhysicalPlanType::Unnest(Box::new(
3110                protobuf::UnnestExecNode {
3111                    input: Some(Box::new(input)),
3112                    schema: Some(exec.schema().try_into()?),
3113                    list_type_columns: exec
3114                        .list_column_indices()
3115                        .iter()
3116                        .map(|c| ProtoListUnnest {
3117                            index_in_input_schema: c.index_in_input_schema as _,
3118                            depth: c.depth as _,
3119                        })
3120                        .collect(),
3121                    struct_type_columns: exec
3122                        .struct_column_indices()
3123                        .iter()
3124                        .map(|c| *c as _)
3125                        .collect(),
3126                    options: Some(exec.options().into()),
3127                },
3128            ))),
3129        })
3130    }
3131
3132    fn try_from_cooperative_exec(
3133        exec: &CooperativeExec,
3134        extension_codec: &dyn PhysicalExtensionCodec,
3135    ) -> Result<Self> {
3136        let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
3137            exec.input().to_owned(),
3138            extension_codec,
3139        )?;
3140
3141        Ok(protobuf::PhysicalPlanNode {
3142            physical_plan_type: Some(PhysicalPlanType::Cooperative(Box::new(
3143                protobuf::CooperativeExecNode {
3144                    input: Some(Box::new(input)),
3145                },
3146            ))),
3147        })
3148    }
3149
3150    fn str_to_generate_series_name(name: &str) -> Result<protobuf::GenerateSeriesName> {
3151        match name {
3152            "generate_series" => Ok(protobuf::GenerateSeriesName::GsGenerateSeries),
3153            "range" => Ok(protobuf::GenerateSeriesName::GsRange),
3154            _ => internal_err!("unknown name: {name}"),
3155        }
3156    }
3157
3158    fn try_from_lazy_memory_exec(exec: &LazyMemoryExec) -> Result<Option<Self>> {
3159        let generators = exec.generators();
3160
3161        // ensure we only have one generator
3162        let [generator] = generators.as_slice() else {
3163            return Ok(None);
3164        };
3165
3166        let generator_guard = generator.read();
3167
3168        // Try to downcast to different generate_series types
3169        if let Some(empty_gen) = generator_guard.as_any().downcast_ref::<Empty>() {
3170            let schema = exec.schema();
3171            let node = protobuf::GenerateSeriesNode {
3172                schema: Some(schema.as_ref().try_into()?),
3173                target_batch_size: 8192, // Default batch size
3174                args: Some(protobuf::generate_series_node::Args::ContainsNull(
3175                    protobuf::GenerateSeriesArgsContainsNull {
3176                        name: Self::str_to_generate_series_name(empty_gen.name())? as i32,
3177                    },
3178                )),
3179            };
3180
3181            return Ok(Some(protobuf::PhysicalPlanNode {
3182                physical_plan_type: Some(PhysicalPlanType::GenerateSeries(node)),
3183            }));
3184        }
3185
3186        if let Some(int_64) = generator_guard
3187            .as_any()
3188            .downcast_ref::<GenericSeriesState<i64>>()
3189        {
3190            let schema = exec.schema();
3191            let node = protobuf::GenerateSeriesNode {
3192                schema: Some(schema.as_ref().try_into()?),
3193                target_batch_size: int_64.batch_size() as u32,
3194                args: Some(protobuf::generate_series_node::Args::Int64Args(
3195                    protobuf::GenerateSeriesArgsInt64 {
3196                        start: *int_64.start(),
3197                        end: *int_64.end(),
3198                        step: *int_64.step(),
3199                        include_end: int_64.include_end(),
3200                        name: Self::str_to_generate_series_name(int_64.name())? as i32,
3201                    },
3202                )),
3203            };
3204
3205            return Ok(Some(protobuf::PhysicalPlanNode {
3206                physical_plan_type: Some(PhysicalPlanType::GenerateSeries(node)),
3207            }));
3208        }
3209
3210        if let Some(timestamp_args) = generator_guard
3211            .as_any()
3212            .downcast_ref::<GenericSeriesState<TimestampValue>>()
3213        {
3214            let schema = exec.schema();
3215
3216            let start = timestamp_args.start().value();
3217            let end = timestamp_args.end().value();
3218
3219            let step_value = timestamp_args.step();
3220
3221            let step = Some(datafusion_proto_common::IntervalMonthDayNanoValue {
3222                months: step_value.months,
3223                days: step_value.days,
3224                nanos: step_value.nanoseconds,
3225            });
3226            let include_end = timestamp_args.include_end();
3227            let name = Self::str_to_generate_series_name(timestamp_args.name())? as i32;
3228
3229            let args = match timestamp_args.current().tz_str() {
3230                Some(tz) => protobuf::generate_series_node::Args::TimestampArgs(
3231                    protobuf::GenerateSeriesArgsTimestamp {
3232                        start,
3233                        end,
3234                        step,
3235                        include_end,
3236                        name,
3237                        tz: Some(tz.to_string()),
3238                    },
3239                ),
3240                None => protobuf::generate_series_node::Args::DateArgs(
3241                    protobuf::GenerateSeriesArgsDate {
3242                        start,
3243                        end,
3244                        step,
3245                        include_end,
3246                        name,
3247                    },
3248                ),
3249            };
3250
3251            let node = protobuf::GenerateSeriesNode {
3252                schema: Some(schema.as_ref().try_into()?),
3253                target_batch_size: timestamp_args.batch_size() as u32,
3254                args: Some(args),
3255            };
3256
3257            return Ok(Some(protobuf::PhysicalPlanNode {
3258                physical_plan_type: Some(PhysicalPlanType::GenerateSeries(node)),
3259            }));
3260        }
3261
3262        Ok(None)
3263    }
3264}
3265
3266pub trait AsExecutionPlan: Debug + Send + Sync + Clone {
3267    fn try_decode(buf: &[u8]) -> Result<Self>
3268    where
3269        Self: Sized;
3270
3271    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
3272    where
3273        B: BufMut,
3274        Self: Sized;
3275
3276    fn try_into_physical_plan(
3277        &self,
3278        ctx: &SessionContext,
3279        runtime: &RuntimeEnv,
3280        extension_codec: &dyn PhysicalExtensionCodec,
3281    ) -> Result<Arc<dyn ExecutionPlan>>;
3282
3283    fn try_from_physical_plan(
3284        plan: Arc<dyn ExecutionPlan>,
3285        extension_codec: &dyn PhysicalExtensionCodec,
3286    ) -> Result<Self>
3287    where
3288        Self: Sized;
3289}
3290
3291pub trait PhysicalExtensionCodec: Debug + Send + Sync {
3292    fn try_decode(
3293        &self,
3294        buf: &[u8],
3295        inputs: &[Arc<dyn ExecutionPlan>],
3296        registry: &dyn FunctionRegistry,
3297    ) -> Result<Arc<dyn ExecutionPlan>>;
3298
3299    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()>;
3300
3301    fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
3302        not_impl_err!("PhysicalExtensionCodec is not provided for scalar function {name}")
3303    }
3304
3305    fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
3306        Ok(())
3307    }
3308
3309    fn try_decode_expr(
3310        &self,
3311        _buf: &[u8],
3312        _inputs: &[Arc<dyn PhysicalExpr>],
3313    ) -> Result<Arc<dyn PhysicalExpr>> {
3314        not_impl_err!("PhysicalExtensionCodec is not provided")
3315    }
3316
3317    fn try_encode_expr(
3318        &self,
3319        _node: &Arc<dyn PhysicalExpr>,
3320        _buf: &mut Vec<u8>,
3321    ) -> Result<()> {
3322        not_impl_err!("PhysicalExtensionCodec is not provided")
3323    }
3324
3325    fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
3326        not_impl_err!(
3327            "PhysicalExtensionCodec is not provided for aggregate function {name}"
3328        )
3329    }
3330
3331    fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
3332        Ok(())
3333    }
3334
3335    fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
3336        not_impl_err!("PhysicalExtensionCodec is not provided for window function {name}")
3337    }
3338
3339    fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
3340        Ok(())
3341    }
3342}
3343
3344#[derive(Debug)]
3345pub struct DefaultPhysicalExtensionCodec {}
3346
3347impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec {
3348    fn try_decode(
3349        &self,
3350        _buf: &[u8],
3351        _inputs: &[Arc<dyn ExecutionPlan>],
3352        _registry: &dyn FunctionRegistry,
3353    ) -> Result<Arc<dyn ExecutionPlan>> {
3354        not_impl_err!("PhysicalExtensionCodec is not provided")
3355    }
3356
3357    fn try_encode(
3358        &self,
3359        _node: Arc<dyn ExecutionPlan>,
3360        _buf: &mut Vec<u8>,
3361    ) -> Result<()> {
3362        not_impl_err!("PhysicalExtensionCodec is not provided")
3363    }
3364}
3365
3366/// DataEncoderTuple captures the position of the encoder
3367/// in the codec list that was used to encode the data and actual encoded data
3368#[derive(Clone, PartialEq, prost::Message)]
3369struct DataEncoderTuple {
3370    /// The position of encoder used to encode data
3371    /// (to be used for decoding)
3372    #[prost(uint32, tag = 1)]
3373    pub encoder_position: u32,
3374
3375    #[prost(bytes, tag = 2)]
3376    pub blob: Vec<u8>,
3377}
3378
3379/// A PhysicalExtensionCodec that tries one of multiple inner codecs
3380/// until one works
3381#[derive(Debug)]
3382pub struct ComposedPhysicalExtensionCodec {
3383    codecs: Vec<Arc<dyn PhysicalExtensionCodec>>,
3384}
3385
3386impl ComposedPhysicalExtensionCodec {
3387    // Position in this codecs list is important as it will be used for decoding.
3388    // If new codec is added it should go to last position.
3389    pub fn new(codecs: Vec<Arc<dyn PhysicalExtensionCodec>>) -> Self {
3390        Self { codecs }
3391    }
3392
3393    fn decode_protobuf<R>(
3394        &self,
3395        buf: &[u8],
3396        decode: impl FnOnce(&dyn PhysicalExtensionCodec, &[u8]) -> Result<R>,
3397    ) -> Result<R> {
3398        let proto = DataEncoderTuple::decode(buf)
3399            .map_err(|e| DataFusionError::Internal(e.to_string()))?;
3400
3401        let codec = self.codecs.get(proto.encoder_position as usize).ok_or(
3402            DataFusionError::Internal(
3403                "Can't find required codec in codec list".to_owned(),
3404            ),
3405        )?;
3406
3407        decode(codec.as_ref(), &proto.blob)
3408    }
3409
3410    fn encode_protobuf(
3411        &self,
3412        buf: &mut Vec<u8>,
3413        mut encode: impl FnMut(&dyn PhysicalExtensionCodec, &mut Vec<u8>) -> Result<()>,
3414    ) -> Result<()> {
3415        let mut data = vec![];
3416        let mut last_err = None;
3417        let mut encoder_position = None;
3418
3419        // find the encoder
3420        for (position, codec) in self.codecs.iter().enumerate() {
3421            match encode(codec.as_ref(), &mut data) {
3422                Ok(_) => {
3423                    encoder_position = Some(position as u32);
3424                    break;
3425                }
3426                Err(err) => last_err = Some(err),
3427            }
3428        }
3429
3430        let encoder_position = encoder_position.ok_or_else(|| {
3431            last_err.unwrap_or_else(|| {
3432                DataFusionError::NotImplemented(
3433                    "Empty list of composed codecs".to_owned(),
3434                )
3435            })
3436        })?;
3437
3438        // encode with encoder position
3439        let proto = DataEncoderTuple {
3440            encoder_position,
3441            blob: data,
3442        };
3443        proto
3444            .encode(buf)
3445            .map_err(|e| DataFusionError::Internal(e.to_string()))
3446    }
3447}
3448
3449impl PhysicalExtensionCodec for ComposedPhysicalExtensionCodec {
3450    fn try_decode(
3451        &self,
3452        buf: &[u8],
3453        inputs: &[Arc<dyn ExecutionPlan>],
3454        registry: &dyn FunctionRegistry,
3455    ) -> Result<Arc<dyn ExecutionPlan>> {
3456        self.decode_protobuf(buf, |codec, data| codec.try_decode(data, inputs, registry))
3457    }
3458
3459    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
3460        self.encode_protobuf(buf, |codec, data| codec.try_encode(Arc::clone(&node), data))
3461    }
3462
3463    fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
3464        self.decode_protobuf(buf, |codec, data| codec.try_decode_udf(name, data))
3465    }
3466
3467    fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
3468        self.encode_protobuf(buf, |codec, data| codec.try_encode_udf(node, data))
3469    }
3470
3471    fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
3472        self.decode_protobuf(buf, |codec, data| codec.try_decode_udaf(name, data))
3473    }
3474
3475    fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
3476        self.encode_protobuf(buf, |codec, data| codec.try_encode_udaf(node, data))
3477    }
3478}
3479
3480fn into_physical_plan(
3481    node: &Option<Box<protobuf::PhysicalPlanNode>>,
3482    ctx: &SessionContext,
3483    runtime: &RuntimeEnv,
3484    extension_codec: &dyn PhysicalExtensionCodec,
3485) -> Result<Arc<dyn ExecutionPlan>> {
3486    if let Some(field) = node {
3487        field.try_into_physical_plan(ctx, runtime, extension_codec)
3488    } else {
3489        Err(proto_error("Missing required field in protobuf"))
3490    }
3491}