datafusion_proto/physical_plan/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use datafusion::physical_expr::aggregate::AggregateExprBuilder;
22use prost::bytes::BufMut;
23use prost::Message;
24
25use datafusion::arrow::compute::SortOptions;
26use datafusion::arrow::datatypes::SchemaRef;
27use datafusion::datasource::file_format::csv::CsvSink;
28use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
29use datafusion::datasource::file_format::json::JsonSink;
30#[cfg(feature = "parquet")]
31use datafusion::datasource::file_format::parquet::ParquetSink;
32#[cfg(feature = "parquet")]
33use datafusion::datasource::physical_plan::ParquetSource;
34use datafusion::datasource::physical_plan::{AvroSource, CsvSource, FileScanConfig};
35use datafusion::datasource::source::DataSourceExec;
36use datafusion::execution::runtime_env::RuntimeEnv;
37use datafusion::execution::FunctionRegistry;
38use datafusion::physical_expr::aggregate::AggregateFunctionExpr;
39use datafusion::physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef};
40use datafusion::physical_plan::aggregates::AggregateMode;
41use datafusion::physical_plan::aggregates::{AggregateExec, PhysicalGroupBy};
42use datafusion::physical_plan::analyze::AnalyzeExec;
43use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
44use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
45use datafusion::physical_plan::empty::EmptyExec;
46use datafusion::physical_plan::explain::ExplainExec;
47use datafusion::physical_plan::expressions::PhysicalSortExpr;
48use datafusion::physical_plan::filter::FilterExec;
49use datafusion::physical_plan::insert::DataSinkExec;
50use datafusion::physical_plan::joins::utils::{ColumnIndex, JoinFilter};
51use datafusion::physical_plan::joins::{
52    CrossJoinExec, NestedLoopJoinExec, StreamJoinPartitionMode, SymmetricHashJoinExec,
53};
54use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
55use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
56use datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
57use datafusion::physical_plan::projection::ProjectionExec;
58use datafusion::physical_plan::repartition::RepartitionExec;
59use datafusion::physical_plan::sorts::sort::SortExec;
60use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
61use datafusion::physical_plan::union::{InterleaveExec, UnionExec};
62use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec};
63use datafusion::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
64use datafusion::physical_plan::{
65    ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr,
66};
67use datafusion_common::config::TableParquetOptions;
68use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
69use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
70
71use crate::common::{byte_to_string, str_to_byte};
72use crate::physical_plan::from_proto::{
73    parse_physical_expr, parse_physical_sort_expr, parse_physical_sort_exprs,
74    parse_physical_window_expr, parse_protobuf_file_scan_config,
75    parse_protobuf_file_scan_schema,
76};
77use crate::physical_plan::to_proto::{
78    serialize_file_scan_config, serialize_maybe_filter, serialize_physical_aggr_expr,
79    serialize_physical_window_expr,
80};
81use crate::protobuf::physical_aggregate_expr_node::AggregateFunction;
82use crate::protobuf::physical_expr_node::ExprType;
83use crate::protobuf::physical_plan_node::PhysicalPlanType;
84use crate::protobuf::{
85    self, proto_error, window_agg_exec_node, ListUnnest as ProtoListUnnest,
86};
87use crate::{convert_required, into_required};
88
89use self::from_proto::parse_protobuf_partitioning;
90use self::to_proto::{serialize_partitioning, serialize_physical_expr};
91
92pub mod from_proto;
93pub mod to_proto;
94
95impl AsExecutionPlan for protobuf::PhysicalPlanNode {
96    fn try_decode(buf: &[u8]) -> Result<Self>
97    where
98        Self: Sized,
99    {
100        protobuf::PhysicalPlanNode::decode(buf).map_err(|e| {
101            DataFusionError::Internal(format!("failed to decode physical plan: {e:?}"))
102        })
103    }
104
105    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
106    where
107        B: BufMut,
108        Self: Sized,
109    {
110        self.encode(buf).map_err(|e| {
111            DataFusionError::Internal(format!("failed to encode physical plan: {e:?}"))
112        })
113    }
114
115    fn try_into_physical_plan(
116        &self,
117        registry: &dyn FunctionRegistry,
118        runtime: &RuntimeEnv,
119        extension_codec: &dyn PhysicalExtensionCodec,
120    ) -> Result<Arc<dyn ExecutionPlan>> {
121        let plan = self.physical_plan_type.as_ref().ok_or_else(|| {
122            proto_error(format!(
123                "physical_plan::from_proto() Unsupported physical plan '{self:?}'"
124            ))
125        })?;
126        match plan {
127            PhysicalPlanType::Explain(explain) => Ok(Arc::new(ExplainExec::new(
128                Arc::new(explain.schema.as_ref().unwrap().try_into()?),
129                explain
130                    .stringified_plans
131                    .iter()
132                    .map(|plan| plan.into())
133                    .collect(),
134                explain.verbose,
135            ))),
136            PhysicalPlanType::Projection(projection) => {
137                let input: Arc<dyn ExecutionPlan> = into_physical_plan(
138                    &projection.input,
139                    registry,
140                    runtime,
141                    extension_codec,
142                )?;
143                let exprs = projection
144                    .expr
145                    .iter()
146                    .zip(projection.expr_name.iter())
147                    .map(|(expr, name)| {
148                        Ok((
149                            parse_physical_expr(
150                                expr,
151                                registry,
152                                input.schema().as_ref(),
153                                extension_codec,
154                            )?,
155                            name.to_string(),
156                        ))
157                    })
158                    .collect::<Result<Vec<(Arc<dyn PhysicalExpr>, String)>>>()?;
159                Ok(Arc::new(ProjectionExec::try_new(exprs, input)?))
160            }
161            PhysicalPlanType::Filter(filter) => {
162                let input: Arc<dyn ExecutionPlan> = into_physical_plan(
163                    &filter.input,
164                    registry,
165                    runtime,
166                    extension_codec,
167                )?;
168                let predicate = filter
169                    .expr
170                    .as_ref()
171                    .map(|expr| {
172                        parse_physical_expr(
173                            expr,
174                            registry,
175                            input.schema().as_ref(),
176                            extension_codec,
177                        )
178                    })
179                    .transpose()?
180                    .ok_or_else(|| {
181                        DataFusionError::Internal(
182                            "filter (FilterExecNode) in PhysicalPlanNode is missing."
183                                .to_owned(),
184                        )
185                    })?;
186                let filter_selectivity = filter.default_filter_selectivity.try_into();
187                let projection = if !filter.projection.is_empty() {
188                    Some(
189                        filter
190                            .projection
191                            .iter()
192                            .map(|i| *i as usize)
193                            .collect::<Vec<_>>(),
194                    )
195                } else {
196                    None
197                };
198                let filter =
199                    FilterExec::try_new(predicate, input)?.with_projection(projection)?;
200                match filter_selectivity {
201                    Ok(filter_selectivity) => Ok(Arc::new(
202                        filter.with_default_selectivity(filter_selectivity)?,
203                    )),
204                    Err(_) => Err(DataFusionError::Internal(
205                        "filter_selectivity in PhysicalPlanNode is invalid ".to_owned(),
206                    )),
207                }
208            }
209            PhysicalPlanType::CsvScan(scan) => {
210                let escape = if let Some(
211                    protobuf::csv_scan_exec_node::OptionalEscape::Escape(escape),
212                ) = &scan.optional_escape
213                {
214                    Some(str_to_byte(escape, "escape")?)
215                } else {
216                    None
217                };
218
219                let comment = if let Some(
220                    protobuf::csv_scan_exec_node::OptionalComment::Comment(comment),
221                ) = &scan.optional_comment
222                {
223                    Some(str_to_byte(comment, "comment")?)
224                } else {
225                    None
226                };
227
228                let source = Arc::new(
229                    CsvSource::new(
230                        scan.has_header,
231                        str_to_byte(&scan.delimiter, "delimiter")?,
232                        0,
233                    )
234                    .with_escape(escape)
235                    .with_comment(comment),
236                );
237
238                let conf = parse_protobuf_file_scan_config(
239                    scan.base_conf.as_ref().unwrap(),
240                    registry,
241                    extension_codec,
242                    source,
243                )?
244                .with_newlines_in_values(scan.newlines_in_values)
245                .with_file_compression_type(FileCompressionType::UNCOMPRESSED);
246                Ok(conf.build())
247            }
248            #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
249            PhysicalPlanType::ParquetScan(scan) => {
250                #[cfg(feature = "parquet")]
251                {
252                    let schema = parse_protobuf_file_scan_schema(
253                        scan.base_conf.as_ref().unwrap(),
254                    )?;
255                    let predicate = scan
256                        .predicate
257                        .as_ref()
258                        .map(|expr| {
259                            parse_physical_expr(
260                                expr,
261                                registry,
262                                schema.as_ref(),
263                                extension_codec,
264                            )
265                        })
266                        .transpose()?;
267                    let mut options = TableParquetOptions::default();
268
269                    if let Some(table_options) = scan.parquet_options.as_ref() {
270                        options = table_options.try_into()?;
271                    }
272                    let mut source = ParquetSource::new(options);
273
274                    if let Some(predicate) = predicate {
275                        source = source.with_predicate(Arc::clone(&schema), predicate);
276                    }
277                    let base_config = parse_protobuf_file_scan_config(
278                        scan.base_conf.as_ref().unwrap(),
279                        registry,
280                        extension_codec,
281                        Arc::new(source),
282                    )?;
283                    Ok(base_config.build())
284                }
285                #[cfg(not(feature = "parquet"))]
286                panic!("Unable to process a Parquet PhysicalPlan when `parquet` feature is not enabled")
287            }
288            PhysicalPlanType::AvroScan(scan) => {
289                let conf = parse_protobuf_file_scan_config(
290                    scan.base_conf.as_ref().unwrap(),
291                    registry,
292                    extension_codec,
293                    Arc::new(AvroSource::new()),
294                )?;
295                Ok(conf.build())
296            }
297            PhysicalPlanType::CoalesceBatches(coalesce_batches) => {
298                let input: Arc<dyn ExecutionPlan> = into_physical_plan(
299                    &coalesce_batches.input,
300                    registry,
301                    runtime,
302                    extension_codec,
303                )?;
304                Ok(Arc::new(
305                    CoalesceBatchesExec::new(
306                        input,
307                        coalesce_batches.target_batch_size as usize,
308                    )
309                    .with_fetch(coalesce_batches.fetch.map(|f| f as usize)),
310                ))
311            }
312            PhysicalPlanType::Merge(merge) => {
313                let input: Arc<dyn ExecutionPlan> =
314                    into_physical_plan(&merge.input, registry, runtime, extension_codec)?;
315                Ok(Arc::new(CoalescePartitionsExec::new(input)))
316            }
317            PhysicalPlanType::Repartition(repart) => {
318                let input: Arc<dyn ExecutionPlan> = into_physical_plan(
319                    &repart.input,
320                    registry,
321                    runtime,
322                    extension_codec,
323                )?;
324                let partitioning = parse_protobuf_partitioning(
325                    repart.partitioning.as_ref(),
326                    registry,
327                    input.schema().as_ref(),
328                    extension_codec,
329                )?;
330                Ok(Arc::new(RepartitionExec::try_new(
331                    input,
332                    partitioning.unwrap(),
333                )?))
334            }
335            PhysicalPlanType::GlobalLimit(limit) => {
336                let input: Arc<dyn ExecutionPlan> =
337                    into_physical_plan(&limit.input, registry, runtime, extension_codec)?;
338                let fetch = if limit.fetch >= 0 {
339                    Some(limit.fetch as usize)
340                } else {
341                    None
342                };
343                Ok(Arc::new(GlobalLimitExec::new(
344                    input,
345                    limit.skip as usize,
346                    fetch,
347                )))
348            }
349            PhysicalPlanType::LocalLimit(limit) => {
350                let input: Arc<dyn ExecutionPlan> =
351                    into_physical_plan(&limit.input, registry, runtime, extension_codec)?;
352                Ok(Arc::new(LocalLimitExec::new(input, limit.fetch as usize)))
353            }
354            PhysicalPlanType::Window(window_agg) => {
355                let input: Arc<dyn ExecutionPlan> = into_physical_plan(
356                    &window_agg.input,
357                    registry,
358                    runtime,
359                    extension_codec,
360                )?;
361                let input_schema = input.schema();
362
363                let physical_window_expr: Vec<Arc<dyn WindowExpr>> = window_agg
364                    .window_expr
365                    .iter()
366                    .map(|window_expr| {
367                        parse_physical_window_expr(
368                            window_expr,
369                            registry,
370                            input_schema.as_ref(),
371                            extension_codec,
372                        )
373                    })
374                    .collect::<Result<Vec<_>, _>>()?;
375
376                let partition_keys = window_agg
377                    .partition_keys
378                    .iter()
379                    .map(|expr| {
380                        parse_physical_expr(
381                            expr,
382                            registry,
383                            input.schema().as_ref(),
384                            extension_codec,
385                        )
386                    })
387                    .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
388
389                if let Some(input_order_mode) = window_agg.input_order_mode.as_ref() {
390                    let input_order_mode = match input_order_mode {
391                        window_agg_exec_node::InputOrderMode::Linear(_) => {
392                            InputOrderMode::Linear
393                        }
394                        window_agg_exec_node::InputOrderMode::PartiallySorted(
395                            protobuf::PartiallySortedInputOrderMode { columns },
396                        ) => InputOrderMode::PartiallySorted(
397                            columns.iter().map(|c| *c as usize).collect(),
398                        ),
399                        window_agg_exec_node::InputOrderMode::Sorted(_) => {
400                            InputOrderMode::Sorted
401                        }
402                    };
403
404                    Ok(Arc::new(BoundedWindowAggExec::try_new(
405                        physical_window_expr,
406                        input,
407                        input_order_mode,
408                        !partition_keys.is_empty(),
409                    )?))
410                } else {
411                    Ok(Arc::new(WindowAggExec::try_new(
412                        physical_window_expr,
413                        input,
414                        !partition_keys.is_empty(),
415                    )?))
416                }
417            }
418            PhysicalPlanType::Aggregate(hash_agg) => {
419                let input: Arc<dyn ExecutionPlan> = into_physical_plan(
420                    &hash_agg.input,
421                    registry,
422                    runtime,
423                    extension_codec,
424                )?;
425                let mode = protobuf::AggregateMode::try_from(hash_agg.mode).map_err(
426                    |_| {
427                        proto_error(format!(
428                            "Received a AggregateNode message with unknown AggregateMode {}",
429                            hash_agg.mode
430                        ))
431                    },
432                )?;
433                let agg_mode: AggregateMode = match mode {
434                    protobuf::AggregateMode::Partial => AggregateMode::Partial,
435                    protobuf::AggregateMode::Final => AggregateMode::Final,
436                    protobuf::AggregateMode::FinalPartitioned => {
437                        AggregateMode::FinalPartitioned
438                    }
439                    protobuf::AggregateMode::Single => AggregateMode::Single,
440                    protobuf::AggregateMode::SinglePartitioned => {
441                        AggregateMode::SinglePartitioned
442                    }
443                };
444
445                let num_expr = hash_agg.group_expr.len();
446
447                let group_expr = hash_agg
448                    .group_expr
449                    .iter()
450                    .zip(hash_agg.group_expr_name.iter())
451                    .map(|(expr, name)| {
452                        parse_physical_expr(
453                            expr,
454                            registry,
455                            input.schema().as_ref(),
456                            extension_codec,
457                        )
458                        .map(|expr| (expr, name.to_string()))
459                    })
460                    .collect::<Result<Vec<_>, _>>()?;
461
462                let null_expr = hash_agg
463                    .null_expr
464                    .iter()
465                    .zip(hash_agg.group_expr_name.iter())
466                    .map(|(expr, name)| {
467                        parse_physical_expr(
468                            expr,
469                            registry,
470                            input.schema().as_ref(),
471                            extension_codec,
472                        )
473                        .map(|expr| (expr, name.to_string()))
474                    })
475                    .collect::<Result<Vec<_>, _>>()?;
476
477                let groups: Vec<Vec<bool>> = if !hash_agg.groups.is_empty() {
478                    hash_agg
479                        .groups
480                        .chunks(num_expr)
481                        .map(|g| g.to_vec())
482                        .collect::<Vec<Vec<bool>>>()
483                } else {
484                    vec![]
485                };
486
487                let input_schema = hash_agg.input_schema.as_ref().ok_or_else(|| {
488                    DataFusionError::Internal(
489                        "input_schema in AggregateNode is missing.".to_owned(),
490                    )
491                })?;
492                let physical_schema: SchemaRef = SchemaRef::new(input_schema.try_into()?);
493
494                let physical_filter_expr = hash_agg
495                    .filter_expr
496                    .iter()
497                    .map(|expr| {
498                        expr.expr
499                            .as_ref()
500                            .map(|e| {
501                                parse_physical_expr(
502                                    e,
503                                    registry,
504                                    &physical_schema,
505                                    extension_codec,
506                                )
507                            })
508                            .transpose()
509                    })
510                    .collect::<Result<Vec<_>, _>>()?;
511
512                let physical_aggr_expr: Vec<Arc<AggregateFunctionExpr>> = hash_agg
513                    .aggr_expr
514                    .iter()
515                    .zip(hash_agg.aggr_expr_name.iter())
516                    .map(|(expr, name)| {
517                        let expr_type = expr.expr_type.as_ref().ok_or_else(|| {
518                            proto_error("Unexpected empty aggregate physical expression")
519                        })?;
520
521                        match expr_type {
522                            ExprType::AggregateExpr(agg_node) => {
523                                let input_phy_expr: Vec<Arc<dyn PhysicalExpr>> = agg_node.expr.iter()
524                                    .map(|e| parse_physical_expr(e, registry, &physical_schema, extension_codec)).collect::<Result<Vec<_>>>()?;
525                                let ordering_req: LexOrdering = agg_node.ordering_req.iter()
526                                    .map(|e| parse_physical_sort_expr(e, registry, &physical_schema, extension_codec))
527                                    .collect::<Result<LexOrdering>>()?;
528                                agg_node.aggregate_function.as_ref().map(|func| {
529                                    match func {
530                                        AggregateFunction::UserDefinedAggrFunction(udaf_name) => {
531                                            let agg_udf = match &agg_node.fun_definition {
532                                                Some(buf) => extension_codec.try_decode_udaf(udaf_name, buf)?,
533                                                None => registry.udaf(udaf_name)?
534                                            };
535
536                                            AggregateExprBuilder::new(agg_udf, input_phy_expr)
537                                                .schema(Arc::clone(&physical_schema))
538                                                .alias(name)
539                                                .with_ignore_nulls(agg_node.ignore_nulls)
540                                                .with_distinct(agg_node.distinct)
541                                                .order_by(ordering_req)
542                                                .build()
543                                                .map(Arc::new)
544                                        }
545                                    }
546                                }).transpose()?.ok_or_else(|| {
547                                    proto_error("Invalid AggregateExpr, missing aggregate_function")
548                                })
549                            }
550                            _ => internal_err!(
551                                "Invalid aggregate expression for AggregateExec"
552                            ),
553                        }
554                    })
555                    .collect::<Result<Vec<_>, _>>()?;
556
557                let limit = hash_agg
558                    .limit
559                    .as_ref()
560                    .map(|lit_value| lit_value.limit as usize);
561
562                let agg = AggregateExec::try_new(
563                    agg_mode,
564                    PhysicalGroupBy::new(group_expr, null_expr, groups),
565                    physical_aggr_expr,
566                    physical_filter_expr,
567                    input,
568                    physical_schema,
569                )?;
570
571                let agg = agg.with_limit(limit);
572
573                Ok(Arc::new(agg))
574            }
575            PhysicalPlanType::HashJoin(hashjoin) => {
576                let left: Arc<dyn ExecutionPlan> = into_physical_plan(
577                    &hashjoin.left,
578                    registry,
579                    runtime,
580                    extension_codec,
581                )?;
582                let right: Arc<dyn ExecutionPlan> = into_physical_plan(
583                    &hashjoin.right,
584                    registry,
585                    runtime,
586                    extension_codec,
587                )?;
588                let left_schema = left.schema();
589                let right_schema = right.schema();
590                let on: Vec<(PhysicalExprRef, PhysicalExprRef)> = hashjoin
591                    .on
592                    .iter()
593                    .map(|col| {
594                        let left = parse_physical_expr(
595                            &col.left.clone().unwrap(),
596                            registry,
597                            left_schema.as_ref(),
598                            extension_codec,
599                        )?;
600                        let right = parse_physical_expr(
601                            &col.right.clone().unwrap(),
602                            registry,
603                            right_schema.as_ref(),
604                            extension_codec,
605                        )?;
606                        Ok((left, right))
607                    })
608                    .collect::<Result<_>>()?;
609                let join_type = protobuf::JoinType::try_from(hashjoin.join_type)
610                    .map_err(|_| {
611                        proto_error(format!(
612                            "Received a HashJoinNode message with unknown JoinType {}",
613                            hashjoin.join_type
614                        ))
615                    })?;
616                let filter = hashjoin
617                    .filter
618                    .as_ref()
619                    .map(|f| {
620                        let schema = f
621                            .schema
622                            .as_ref()
623                            .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
624                            .try_into()?;
625
626                        let expression = parse_physical_expr(
627                            f.expression.as_ref().ok_or_else(|| {
628                                proto_error("Unexpected empty filter expression")
629                            })?,
630                            registry, &schema,
631                            extension_codec,
632                        )?;
633                        let column_indices = f.column_indices
634                            .iter()
635                            .map(|i| {
636                                let side = protobuf::JoinSide::try_from(i.side)
637                                    .map_err(|_| proto_error(format!(
638                                        "Received a HashJoinNode message with JoinSide in Filter {}",
639                                        i.side))
640                                    )?;
641
642                                Ok(ColumnIndex {
643                                    index: i.index as usize,
644                                    side: side.into(),
645                                })
646                            })
647                            .collect::<Result<Vec<_>>>()?;
648
649                        Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
650                    })
651                    .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
652
653                let partition_mode = protobuf::PartitionMode::try_from(
654                    hashjoin.partition_mode,
655                )
656                .map_err(|_| {
657                    proto_error(format!(
658                        "Received a HashJoinNode message with unknown PartitionMode {}",
659                        hashjoin.partition_mode
660                    ))
661                })?;
662                let partition_mode = match partition_mode {
663                    protobuf::PartitionMode::CollectLeft => PartitionMode::CollectLeft,
664                    protobuf::PartitionMode::Partitioned => PartitionMode::Partitioned,
665                    protobuf::PartitionMode::Auto => PartitionMode::Auto,
666                };
667                let projection = if !hashjoin.projection.is_empty() {
668                    Some(
669                        hashjoin
670                            .projection
671                            .iter()
672                            .map(|i| *i as usize)
673                            .collect::<Vec<_>>(),
674                    )
675                } else {
676                    None
677                };
678                Ok(Arc::new(HashJoinExec::try_new(
679                    left,
680                    right,
681                    on,
682                    filter,
683                    &join_type.into(),
684                    projection,
685                    partition_mode,
686                    hashjoin.null_equals_null,
687                )?))
688            }
689            PhysicalPlanType::SymmetricHashJoin(sym_join) => {
690                let left = into_physical_plan(
691                    &sym_join.left,
692                    registry,
693                    runtime,
694                    extension_codec,
695                )?;
696                let right = into_physical_plan(
697                    &sym_join.right,
698                    registry,
699                    runtime,
700                    extension_codec,
701                )?;
702                let left_schema = left.schema();
703                let right_schema = right.schema();
704                let on = sym_join
705                    .on
706                    .iter()
707                    .map(|col| {
708                        let left = parse_physical_expr(
709                            &col.left.clone().unwrap(),
710                            registry,
711                            left_schema.as_ref(),
712                            extension_codec,
713                        )?;
714                        let right = parse_physical_expr(
715                            &col.right.clone().unwrap(),
716                            registry,
717                            right_schema.as_ref(),
718                            extension_codec,
719                        )?;
720                        Ok((left, right))
721                    })
722                    .collect::<Result<_>>()?;
723                let join_type = protobuf::JoinType::try_from(sym_join.join_type)
724                    .map_err(|_| {
725                        proto_error(format!(
726                            "Received a SymmetricHashJoin message with unknown JoinType {}",
727                            sym_join.join_type
728                        ))
729                    })?;
730                let filter = sym_join
731                    .filter
732                    .as_ref()
733                    .map(|f| {
734                        let schema = f
735                            .schema
736                            .as_ref()
737                            .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
738                            .try_into()?;
739
740                        let expression = parse_physical_expr(
741                            f.expression.as_ref().ok_or_else(|| {
742                                proto_error("Unexpected empty filter expression")
743                            })?,
744                            registry, &schema,
745                            extension_codec,
746                        )?;
747                        let column_indices = f.column_indices
748                            .iter()
749                            .map(|i| {
750                                let side = protobuf::JoinSide::try_from(i.side)
751                                    .map_err(|_| proto_error(format!(
752                                        "Received a HashJoinNode message with JoinSide in Filter {}",
753                                        i.side))
754                                    )?;
755
756                                Ok(ColumnIndex {
757                                    index: i.index as usize,
758                                    side: side.into(),
759                                })
760                            })
761                            .collect::<Result<_>>()?;
762
763                        Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
764                    })
765                    .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
766
767                let left_sort_exprs = parse_physical_sort_exprs(
768                    &sym_join.left_sort_exprs,
769                    registry,
770                    &left_schema,
771                    extension_codec,
772                )?;
773                let left_sort_exprs = if left_sort_exprs.is_empty() {
774                    None
775                } else {
776                    Some(left_sort_exprs)
777                };
778
779                let right_sort_exprs = parse_physical_sort_exprs(
780                    &sym_join.right_sort_exprs,
781                    registry,
782                    &right_schema,
783                    extension_codec,
784                )?;
785                let right_sort_exprs = if right_sort_exprs.is_empty() {
786                    None
787                } else {
788                    Some(right_sort_exprs)
789                };
790
791                let partition_mode =
792                    protobuf::StreamPartitionMode::try_from(sym_join.partition_mode).map_err(|_| {
793                        proto_error(format!(
794                            "Received a SymmetricHashJoin message with unknown PartitionMode {}",
795                            sym_join.partition_mode
796                        ))
797                    })?;
798                let partition_mode = match partition_mode {
799                    protobuf::StreamPartitionMode::SinglePartition => {
800                        StreamJoinPartitionMode::SinglePartition
801                    }
802                    protobuf::StreamPartitionMode::PartitionedExec => {
803                        StreamJoinPartitionMode::Partitioned
804                    }
805                };
806                SymmetricHashJoinExec::try_new(
807                    left,
808                    right,
809                    on,
810                    filter,
811                    &join_type.into(),
812                    sym_join.null_equals_null,
813                    left_sort_exprs,
814                    right_sort_exprs,
815                    partition_mode,
816                )
817                .map(|e| Arc::new(e) as _)
818            }
819            PhysicalPlanType::Union(union) => {
820                let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
821                for input in &union.inputs {
822                    inputs.push(input.try_into_physical_plan(
823                        registry,
824                        runtime,
825                        extension_codec,
826                    )?);
827                }
828                Ok(Arc::new(UnionExec::new(inputs)))
829            }
830            PhysicalPlanType::Interleave(interleave) => {
831                let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
832                for input in &interleave.inputs {
833                    inputs.push(input.try_into_physical_plan(
834                        registry,
835                        runtime,
836                        extension_codec,
837                    )?);
838                }
839                Ok(Arc::new(InterleaveExec::try_new(inputs)?))
840            }
841            PhysicalPlanType::CrossJoin(crossjoin) => {
842                let left: Arc<dyn ExecutionPlan> = into_physical_plan(
843                    &crossjoin.left,
844                    registry,
845                    runtime,
846                    extension_codec,
847                )?;
848                let right: Arc<dyn ExecutionPlan> = into_physical_plan(
849                    &crossjoin.right,
850                    registry,
851                    runtime,
852                    extension_codec,
853                )?;
854                Ok(Arc::new(CrossJoinExec::new(left, right)))
855            }
856            PhysicalPlanType::Empty(empty) => {
857                let schema = Arc::new(convert_required!(empty.schema)?);
858                Ok(Arc::new(EmptyExec::new(schema)))
859            }
860            PhysicalPlanType::PlaceholderRow(placeholder) => {
861                let schema = Arc::new(convert_required!(placeholder.schema)?);
862                Ok(Arc::new(PlaceholderRowExec::new(schema)))
863            }
864            PhysicalPlanType::Sort(sort) => {
865                let input: Arc<dyn ExecutionPlan> =
866                    into_physical_plan(&sort.input, registry, runtime, extension_codec)?;
867                let exprs = sort
868                    .expr
869                    .iter()
870                    .map(|expr| {
871                        let expr = expr.expr_type.as_ref().ok_or_else(|| {
872                            proto_error(format!(
873                                "physical_plan::from_proto() Unexpected expr {self:?}"
874                            ))
875                        })?;
876                        if let ExprType::Sort(sort_expr) = expr {
877                            let expr = sort_expr
878                                .expr
879                                .as_ref()
880                                .ok_or_else(|| {
881                                    proto_error(format!(
882                                        "physical_plan::from_proto() Unexpected sort expr {self:?}"
883                                    ))
884                                })?
885                                .as_ref();
886                            Ok(PhysicalSortExpr {
887                                expr: parse_physical_expr(expr, registry, input.schema().as_ref(), extension_codec)?,
888                                options: SortOptions {
889                                    descending: !sort_expr.asc,
890                                    nulls_first: sort_expr.nulls_first,
891                                },
892                            })
893                        } else {
894                            internal_err!(
895                                "physical_plan::from_proto() {self:?}"
896                            )
897                        }
898                    })
899                    .collect::<Result<LexOrdering, _>>()?;
900                let fetch = if sort.fetch < 0 {
901                    None
902                } else {
903                    Some(sort.fetch as usize)
904                };
905                let new_sort = SortExec::new(exprs, input)
906                    .with_fetch(fetch)
907                    .with_preserve_partitioning(sort.preserve_partitioning);
908
909                Ok(Arc::new(new_sort))
910            }
911            PhysicalPlanType::SortPreservingMerge(sort) => {
912                let input: Arc<dyn ExecutionPlan> =
913                    into_physical_plan(&sort.input, registry, runtime, extension_codec)?;
914                let exprs = sort
915                    .expr
916                    .iter()
917                    .map(|expr| {
918                        let expr = expr.expr_type.as_ref().ok_or_else(|| {
919                            proto_error(format!(
920                                "physical_plan::from_proto() Unexpected expr {self:?}"
921                            ))
922                        })?;
923                        if let ExprType::Sort(sort_expr) = expr {
924                            let expr = sort_expr
925                                .expr
926                                .as_ref()
927                                .ok_or_else(|| {
928                                    proto_error(format!(
929                                        "physical_plan::from_proto() Unexpected sort expr {self:?}"
930                                    ))
931                                })?
932                                .as_ref();
933                            Ok(PhysicalSortExpr {
934                                expr: parse_physical_expr(expr, registry, input.schema().as_ref(), extension_codec)?,
935                                options: SortOptions {
936                                    descending: !sort_expr.asc,
937                                    nulls_first: sort_expr.nulls_first,
938                                },
939                            })
940                        } else {
941                            internal_err!(
942                                "physical_plan::from_proto() {self:?}"
943                            )
944                        }
945                    })
946                    .collect::<Result<LexOrdering, _>>()?;
947                let fetch = if sort.fetch < 0 {
948                    None
949                } else {
950                    Some(sort.fetch as usize)
951                };
952                Ok(Arc::new(
953                    SortPreservingMergeExec::new(exprs, input).with_fetch(fetch),
954                ))
955            }
956            PhysicalPlanType::Extension(extension) => {
957                let inputs: Vec<Arc<dyn ExecutionPlan>> = extension
958                    .inputs
959                    .iter()
960                    .map(|i| i.try_into_physical_plan(registry, runtime, extension_codec))
961                    .collect::<Result<_>>()?;
962
963                let extension_node = extension_codec.try_decode(
964                    extension.node.as_slice(),
965                    &inputs,
966                    registry,
967                )?;
968
969                Ok(extension_node)
970            }
971            PhysicalPlanType::NestedLoopJoin(join) => {
972                let left: Arc<dyn ExecutionPlan> =
973                    into_physical_plan(&join.left, registry, runtime, extension_codec)?;
974                let right: Arc<dyn ExecutionPlan> =
975                    into_physical_plan(&join.right, registry, runtime, extension_codec)?;
976                let join_type =
977                    protobuf::JoinType::try_from(join.join_type).map_err(|_| {
978                        proto_error(format!(
979                            "Received a NestedLoopJoinExecNode message with unknown JoinType {}",
980                            join.join_type
981                        ))
982                    })?;
983                let filter = join
984                    .filter
985                    .as_ref()
986                    .map(|f| {
987                        let schema = f
988                            .schema
989                            .as_ref()
990                            .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
991                            .try_into()?;
992
993                        let expression = parse_physical_expr(
994                            f.expression.as_ref().ok_or_else(|| {
995                                proto_error("Unexpected empty filter expression")
996                            })?,
997                            registry, &schema,
998                            extension_codec,
999                        )?;
1000                        let column_indices = f.column_indices
1001                            .iter()
1002                            .map(|i| {
1003                                let side = protobuf::JoinSide::try_from(i.side)
1004                                    .map_err(|_| proto_error(format!(
1005                                        "Received a NestedLoopJoinExecNode message with JoinSide in Filter {}",
1006                                        i.side))
1007                                    )?;
1008
1009                                Ok(ColumnIndex {
1010                                    index: i.index as usize,
1011                                    side: side.into(),
1012                                })
1013                            })
1014                            .collect::<Result<Vec<_>>>()?;
1015
1016                        Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1017                    })
1018                    .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1019
1020                let projection = if !join.projection.is_empty() {
1021                    Some(
1022                        join.projection
1023                            .iter()
1024                            .map(|i| *i as usize)
1025                            .collect::<Vec<_>>(),
1026                    )
1027                } else {
1028                    None
1029                };
1030
1031                Ok(Arc::new(NestedLoopJoinExec::try_new(
1032                    left,
1033                    right,
1034                    filter,
1035                    &join_type.into(),
1036                    projection,
1037                )?))
1038            }
1039            PhysicalPlanType::Analyze(analyze) => {
1040                let input: Arc<dyn ExecutionPlan> = into_physical_plan(
1041                    &analyze.input,
1042                    registry,
1043                    runtime,
1044                    extension_codec,
1045                )?;
1046                Ok(Arc::new(AnalyzeExec::new(
1047                    analyze.verbose,
1048                    analyze.show_statistics,
1049                    input,
1050                    Arc::new(convert_required!(analyze.schema)?),
1051                )))
1052            }
1053            PhysicalPlanType::JsonSink(sink) => {
1054                let input =
1055                    into_physical_plan(&sink.input, registry, runtime, extension_codec)?;
1056
1057                let data_sink: JsonSink = sink
1058                    .sink
1059                    .as_ref()
1060                    .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1061                    .try_into()?;
1062                let sink_schema = input.schema();
1063                let sort_order = sink
1064                    .sort_order
1065                    .as_ref()
1066                    .map(|collection| {
1067                        parse_physical_sort_exprs(
1068                            &collection.physical_sort_expr_nodes,
1069                            registry,
1070                            &sink_schema,
1071                            extension_codec,
1072                        )
1073                        .map(LexRequirement::from)
1074                    })
1075                    .transpose()?;
1076                Ok(Arc::new(DataSinkExec::new(
1077                    input,
1078                    Arc::new(data_sink),
1079                    sort_order,
1080                )))
1081            }
1082            PhysicalPlanType::CsvSink(sink) => {
1083                let input =
1084                    into_physical_plan(&sink.input, registry, runtime, extension_codec)?;
1085
1086                let data_sink: CsvSink = sink
1087                    .sink
1088                    .as_ref()
1089                    .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1090                    .try_into()?;
1091                let sink_schema = input.schema();
1092                let sort_order = sink
1093                    .sort_order
1094                    .as_ref()
1095                    .map(|collection| {
1096                        parse_physical_sort_exprs(
1097                            &collection.physical_sort_expr_nodes,
1098                            registry,
1099                            &sink_schema,
1100                            extension_codec,
1101                        )
1102                        .map(LexRequirement::from)
1103                    })
1104                    .transpose()?;
1105                Ok(Arc::new(DataSinkExec::new(
1106                    input,
1107                    Arc::new(data_sink),
1108                    sort_order,
1109                )))
1110            }
1111            #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
1112            PhysicalPlanType::ParquetSink(sink) => {
1113                #[cfg(feature = "parquet")]
1114                {
1115                    let input = into_physical_plan(
1116                        &sink.input,
1117                        registry,
1118                        runtime,
1119                        extension_codec,
1120                    )?;
1121
1122                    let data_sink: ParquetSink = sink
1123                        .sink
1124                        .as_ref()
1125                        .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1126                        .try_into()?;
1127                    let sink_schema = input.schema();
1128                    let sort_order = sink
1129                        .sort_order
1130                        .as_ref()
1131                        .map(|collection| {
1132                            parse_physical_sort_exprs(
1133                                &collection.physical_sort_expr_nodes,
1134                                registry,
1135                                &sink_schema,
1136                                extension_codec,
1137                            )
1138                            .map(LexRequirement::from)
1139                        })
1140                        .transpose()?;
1141                    Ok(Arc::new(DataSinkExec::new(
1142                        input,
1143                        Arc::new(data_sink),
1144                        sort_order,
1145                    )))
1146                }
1147                #[cfg(not(feature = "parquet"))]
1148                panic!("Trying to use ParquetSink without `parquet` feature enabled");
1149            }
1150            PhysicalPlanType::Unnest(unnest) => {
1151                let input = into_physical_plan(
1152                    &unnest.input,
1153                    registry,
1154                    runtime,
1155                    extension_codec,
1156                )?;
1157
1158                Ok(Arc::new(UnnestExec::new(
1159                    input,
1160                    unnest
1161                        .list_type_columns
1162                        .iter()
1163                        .map(|c| ListUnnest {
1164                            index_in_input_schema: c.index_in_input_schema as _,
1165                            depth: c.depth as _,
1166                        })
1167                        .collect(),
1168                    unnest.struct_type_columns.iter().map(|c| *c as _).collect(),
1169                    Arc::new(convert_required!(unnest.schema)?),
1170                    into_required!(unnest.options)?,
1171                )))
1172            }
1173        }
1174    }
1175
1176    fn try_from_physical_plan(
1177        plan: Arc<dyn ExecutionPlan>,
1178        extension_codec: &dyn PhysicalExtensionCodec,
1179    ) -> Result<Self>
1180    where
1181        Self: Sized,
1182    {
1183        let plan_clone = Arc::clone(&plan);
1184        let plan = plan.as_any();
1185
1186        if let Some(exec) = plan.downcast_ref::<ExplainExec>() {
1187            return Ok(protobuf::PhysicalPlanNode {
1188                physical_plan_type: Some(PhysicalPlanType::Explain(
1189                    protobuf::ExplainExecNode {
1190                        schema: Some(exec.schema().as_ref().try_into()?),
1191                        stringified_plans: exec
1192                            .stringified_plans()
1193                            .iter()
1194                            .map(|plan| plan.into())
1195                            .collect(),
1196                        verbose: exec.verbose(),
1197                    },
1198                )),
1199            });
1200        }
1201
1202        if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
1203            let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1204                exec.input().to_owned(),
1205                extension_codec,
1206            )?;
1207            let expr = exec
1208                .expr()
1209                .iter()
1210                .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
1211                .collect::<Result<Vec<_>>>()?;
1212            let expr_name = exec.expr().iter().map(|expr| expr.1.clone()).collect();
1213            return Ok(protobuf::PhysicalPlanNode {
1214                physical_plan_type: Some(PhysicalPlanType::Projection(Box::new(
1215                    protobuf::ProjectionExecNode {
1216                        input: Some(Box::new(input)),
1217                        expr,
1218                        expr_name,
1219                    },
1220                ))),
1221            });
1222        }
1223
1224        if let Some(exec) = plan.downcast_ref::<AnalyzeExec>() {
1225            let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1226                exec.input().to_owned(),
1227                extension_codec,
1228            )?;
1229            return Ok(protobuf::PhysicalPlanNode {
1230                physical_plan_type: Some(PhysicalPlanType::Analyze(Box::new(
1231                    protobuf::AnalyzeExecNode {
1232                        verbose: exec.verbose(),
1233                        show_statistics: exec.show_statistics(),
1234                        input: Some(Box::new(input)),
1235                        schema: Some(exec.schema().as_ref().try_into()?),
1236                    },
1237                ))),
1238            });
1239        }
1240
1241        if let Some(exec) = plan.downcast_ref::<FilterExec>() {
1242            let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1243                exec.input().to_owned(),
1244                extension_codec,
1245            )?;
1246            return Ok(protobuf::PhysicalPlanNode {
1247                physical_plan_type: Some(PhysicalPlanType::Filter(Box::new(
1248                    protobuf::FilterExecNode {
1249                        input: Some(Box::new(input)),
1250                        expr: Some(serialize_physical_expr(
1251                            exec.predicate(),
1252                            extension_codec,
1253                        )?),
1254                        default_filter_selectivity: exec.default_selectivity() as u32,
1255                        projection: exec
1256                            .projection()
1257                            .as_ref()
1258                            .map_or_else(Vec::new, |v| {
1259                                v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
1260                            }),
1261                    },
1262                ))),
1263            });
1264        }
1265
1266        if let Some(limit) = plan.downcast_ref::<GlobalLimitExec>() {
1267            let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1268                limit.input().to_owned(),
1269                extension_codec,
1270            )?;
1271
1272            return Ok(protobuf::PhysicalPlanNode {
1273                physical_plan_type: Some(PhysicalPlanType::GlobalLimit(Box::new(
1274                    protobuf::GlobalLimitExecNode {
1275                        input: Some(Box::new(input)),
1276                        skip: limit.skip() as u32,
1277                        fetch: match limit.fetch() {
1278                            Some(n) => n as i64,
1279                            _ => -1, // no limit
1280                        },
1281                    },
1282                ))),
1283            });
1284        }
1285
1286        if let Some(limit) = plan.downcast_ref::<LocalLimitExec>() {
1287            let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1288                limit.input().to_owned(),
1289                extension_codec,
1290            )?;
1291            return Ok(protobuf::PhysicalPlanNode {
1292                physical_plan_type: Some(PhysicalPlanType::LocalLimit(Box::new(
1293                    protobuf::LocalLimitExecNode {
1294                        input: Some(Box::new(input)),
1295                        fetch: limit.fetch() as u32,
1296                    },
1297                ))),
1298            });
1299        }
1300
1301        if let Some(exec) = plan.downcast_ref::<HashJoinExec>() {
1302            let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
1303                exec.left().to_owned(),
1304                extension_codec,
1305            )?;
1306            let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
1307                exec.right().to_owned(),
1308                extension_codec,
1309            )?;
1310            let on: Vec<protobuf::JoinOn> = exec
1311                .on()
1312                .iter()
1313                .map(|tuple| {
1314                    let l = serialize_physical_expr(&tuple.0, extension_codec)?;
1315                    let r = serialize_physical_expr(&tuple.1, extension_codec)?;
1316                    Ok::<_, DataFusionError>(protobuf::JoinOn {
1317                        left: Some(l),
1318                        right: Some(r),
1319                    })
1320                })
1321                .collect::<Result<_>>()?;
1322            let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
1323            let filter = exec
1324                .filter()
1325                .as_ref()
1326                .map(|f| {
1327                    let expression =
1328                        serialize_physical_expr(f.expression(), extension_codec)?;
1329                    let column_indices = f
1330                        .column_indices()
1331                        .iter()
1332                        .map(|i| {
1333                            let side: protobuf::JoinSide = i.side.to_owned().into();
1334                            protobuf::ColumnIndex {
1335                                index: i.index as u32,
1336                                side: side.into(),
1337                            }
1338                        })
1339                        .collect();
1340                    let schema = f.schema().as_ref().try_into()?;
1341                    Ok(protobuf::JoinFilter {
1342                        expression: Some(expression),
1343                        column_indices,
1344                        schema: Some(schema),
1345                    })
1346                })
1347                .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
1348
1349            let partition_mode = match exec.partition_mode() {
1350                PartitionMode::CollectLeft => protobuf::PartitionMode::CollectLeft,
1351                PartitionMode::Partitioned => protobuf::PartitionMode::Partitioned,
1352                PartitionMode::Auto => protobuf::PartitionMode::Auto,
1353            };
1354
1355            return Ok(protobuf::PhysicalPlanNode {
1356                physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new(
1357                    protobuf::HashJoinExecNode {
1358                        left: Some(Box::new(left)),
1359                        right: Some(Box::new(right)),
1360                        on,
1361                        join_type: join_type.into(),
1362                        partition_mode: partition_mode.into(),
1363                        null_equals_null: exec.null_equals_null(),
1364                        filter,
1365                        projection: exec.projection.as_ref().map_or_else(Vec::new, |v| {
1366                            v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
1367                        }),
1368                    },
1369                ))),
1370            });
1371        }
1372
1373        if let Some(exec) = plan.downcast_ref::<SymmetricHashJoinExec>() {
1374            let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
1375                exec.left().to_owned(),
1376                extension_codec,
1377            )?;
1378            let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
1379                exec.right().to_owned(),
1380                extension_codec,
1381            )?;
1382            let on = exec
1383                .on()
1384                .iter()
1385                .map(|tuple| {
1386                    let l = serialize_physical_expr(&tuple.0, extension_codec)?;
1387                    let r = serialize_physical_expr(&tuple.1, extension_codec)?;
1388                    Ok::<_, DataFusionError>(protobuf::JoinOn {
1389                        left: Some(l),
1390                        right: Some(r),
1391                    })
1392                })
1393                .collect::<Result<_>>()?;
1394            let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
1395            let filter = exec
1396                .filter()
1397                .as_ref()
1398                .map(|f| {
1399                    let expression =
1400                        serialize_physical_expr(f.expression(), extension_codec)?;
1401                    let column_indices = f
1402                        .column_indices()
1403                        .iter()
1404                        .map(|i| {
1405                            let side: protobuf::JoinSide = i.side.to_owned().into();
1406                            protobuf::ColumnIndex {
1407                                index: i.index as u32,
1408                                side: side.into(),
1409                            }
1410                        })
1411                        .collect();
1412                    let schema = f.schema().as_ref().try_into()?;
1413                    Ok(protobuf::JoinFilter {
1414                        expression: Some(expression),
1415                        column_indices,
1416                        schema: Some(schema),
1417                    })
1418                })
1419                .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
1420
1421            let partition_mode = match exec.partition_mode() {
1422                StreamJoinPartitionMode::SinglePartition => {
1423                    protobuf::StreamPartitionMode::SinglePartition
1424                }
1425                StreamJoinPartitionMode::Partitioned => {
1426                    protobuf::StreamPartitionMode::PartitionedExec
1427                }
1428            };
1429
1430            let left_sort_exprs = exec
1431                .left_sort_exprs()
1432                .map(|exprs| {
1433                    exprs
1434                        .iter()
1435                        .map(|expr| {
1436                            Ok(protobuf::PhysicalSortExprNode {
1437                                expr: Some(Box::new(serialize_physical_expr(
1438                                    &expr.expr,
1439                                    extension_codec,
1440                                )?)),
1441                                asc: !expr.options.descending,
1442                                nulls_first: expr.options.nulls_first,
1443                            })
1444                        })
1445                        .collect::<Result<Vec<_>>>()
1446                })
1447                .transpose()?
1448                .unwrap_or(vec![]);
1449
1450            let right_sort_exprs = exec
1451                .right_sort_exprs()
1452                .map(|exprs| {
1453                    exprs
1454                        .iter()
1455                        .map(|expr| {
1456                            Ok(protobuf::PhysicalSortExprNode {
1457                                expr: Some(Box::new(serialize_physical_expr(
1458                                    &expr.expr,
1459                                    extension_codec,
1460                                )?)),
1461                                asc: !expr.options.descending,
1462                                nulls_first: expr.options.nulls_first,
1463                            })
1464                        })
1465                        .collect::<Result<Vec<_>>>()
1466                })
1467                .transpose()?
1468                .unwrap_or(vec![]);
1469
1470            return Ok(protobuf::PhysicalPlanNode {
1471                physical_plan_type: Some(PhysicalPlanType::SymmetricHashJoin(Box::new(
1472                    protobuf::SymmetricHashJoinExecNode {
1473                        left: Some(Box::new(left)),
1474                        right: Some(Box::new(right)),
1475                        on,
1476                        join_type: join_type.into(),
1477                        partition_mode: partition_mode.into(),
1478                        null_equals_null: exec.null_equals_null(),
1479                        left_sort_exprs,
1480                        right_sort_exprs,
1481                        filter,
1482                    },
1483                ))),
1484            });
1485        }
1486
1487        if let Some(exec) = plan.downcast_ref::<CrossJoinExec>() {
1488            let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
1489                exec.left().to_owned(),
1490                extension_codec,
1491            )?;
1492            let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
1493                exec.right().to_owned(),
1494                extension_codec,
1495            )?;
1496            return Ok(protobuf::PhysicalPlanNode {
1497                physical_plan_type: Some(PhysicalPlanType::CrossJoin(Box::new(
1498                    protobuf::CrossJoinExecNode {
1499                        left: Some(Box::new(left)),
1500                        right: Some(Box::new(right)),
1501                    },
1502                ))),
1503            });
1504        }
1505        if let Some(exec) = plan.downcast_ref::<AggregateExec>() {
1506            let groups: Vec<bool> = exec
1507                .group_expr()
1508                .groups()
1509                .iter()
1510                .flatten()
1511                .copied()
1512                .collect();
1513
1514            let group_names = exec
1515                .group_expr()
1516                .expr()
1517                .iter()
1518                .map(|expr| expr.1.to_owned())
1519                .collect();
1520
1521            let filter = exec
1522                .filter_expr()
1523                .iter()
1524                .map(|expr| serialize_maybe_filter(expr.to_owned(), extension_codec))
1525                .collect::<Result<Vec<_>>>()?;
1526
1527            let agg = exec
1528                .aggr_expr()
1529                .iter()
1530                .map(|expr| {
1531                    serialize_physical_aggr_expr(expr.to_owned(), extension_codec)
1532                })
1533                .collect::<Result<Vec<_>>>()?;
1534
1535            let agg_names = exec
1536                .aggr_expr()
1537                .iter()
1538                .map(|expr| expr.name().to_string())
1539                .collect::<Vec<_>>();
1540
1541            let agg_mode = match exec.mode() {
1542                AggregateMode::Partial => protobuf::AggregateMode::Partial,
1543                AggregateMode::Final => protobuf::AggregateMode::Final,
1544                AggregateMode::FinalPartitioned => {
1545                    protobuf::AggregateMode::FinalPartitioned
1546                }
1547                AggregateMode::Single => protobuf::AggregateMode::Single,
1548                AggregateMode::SinglePartitioned => {
1549                    protobuf::AggregateMode::SinglePartitioned
1550                }
1551            };
1552            let input_schema = exec.input_schema();
1553            let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1554                exec.input().to_owned(),
1555                extension_codec,
1556            )?;
1557
1558            let null_expr = exec
1559                .group_expr()
1560                .null_expr()
1561                .iter()
1562                .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
1563                .collect::<Result<Vec<_>>>()?;
1564
1565            let group_expr = exec
1566                .group_expr()
1567                .expr()
1568                .iter()
1569                .map(|expr| serialize_physical_expr(&expr.0, extension_codec))
1570                .collect::<Result<Vec<_>>>()?;
1571
1572            let limit = exec.limit().map(|value| protobuf::AggLimit {
1573                limit: value as u64,
1574            });
1575
1576            return Ok(protobuf::PhysicalPlanNode {
1577                physical_plan_type: Some(PhysicalPlanType::Aggregate(Box::new(
1578                    protobuf::AggregateExecNode {
1579                        group_expr,
1580                        group_expr_name: group_names,
1581                        aggr_expr: agg,
1582                        filter_expr: filter,
1583                        aggr_expr_name: agg_names,
1584                        mode: agg_mode as i32,
1585                        input: Some(Box::new(input)),
1586                        input_schema: Some(input_schema.as_ref().try_into()?),
1587                        null_expr,
1588                        groups,
1589                        limit,
1590                    },
1591                ))),
1592            });
1593        }
1594
1595        if let Some(empty) = plan.downcast_ref::<EmptyExec>() {
1596            let schema = empty.schema().as_ref().try_into()?;
1597            return Ok(protobuf::PhysicalPlanNode {
1598                physical_plan_type: Some(PhysicalPlanType::Empty(
1599                    protobuf::EmptyExecNode {
1600                        schema: Some(schema),
1601                    },
1602                )),
1603            });
1604        }
1605
1606        if let Some(empty) = plan.downcast_ref::<PlaceholderRowExec>() {
1607            let schema = empty.schema().as_ref().try_into()?;
1608            return Ok(protobuf::PhysicalPlanNode {
1609                physical_plan_type: Some(PhysicalPlanType::PlaceholderRow(
1610                    protobuf::PlaceholderRowExecNode {
1611                        schema: Some(schema),
1612                    },
1613                )),
1614            });
1615        }
1616
1617        if let Some(coalesce_batches) = plan.downcast_ref::<CoalesceBatchesExec>() {
1618            let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1619                coalesce_batches.input().to_owned(),
1620                extension_codec,
1621            )?;
1622            return Ok(protobuf::PhysicalPlanNode {
1623                physical_plan_type: Some(PhysicalPlanType::CoalesceBatches(Box::new(
1624                    protobuf::CoalesceBatchesExecNode {
1625                        input: Some(Box::new(input)),
1626                        target_batch_size: coalesce_batches.target_batch_size() as u32,
1627                        fetch: coalesce_batches.fetch().map(|n| n as u32),
1628                    },
1629                ))),
1630            });
1631        }
1632
1633        if let Some(data_source_exec) = plan.downcast_ref::<DataSourceExec>() {
1634            let data_source = data_source_exec.data_source();
1635            if let Some(maybe_csv) = data_source.as_any().downcast_ref::<FileScanConfig>()
1636            {
1637                let source = maybe_csv.file_source();
1638                if let Some(csv_config) = source.as_any().downcast_ref::<CsvSource>() {
1639                    return Ok(protobuf::PhysicalPlanNode {
1640                        physical_plan_type: Some(PhysicalPlanType::CsvScan(
1641                            protobuf::CsvScanExecNode {
1642                                base_conf: Some(serialize_file_scan_config(
1643                                    maybe_csv,
1644                                    extension_codec,
1645                                )?),
1646                                has_header: csv_config.has_header(),
1647                                delimiter: byte_to_string(
1648                                    csv_config.delimiter(),
1649                                    "delimiter",
1650                                )?,
1651                                quote: byte_to_string(csv_config.quote(), "quote")?,
1652                                optional_escape: if let Some(escape) = csv_config.escape()
1653                                {
1654                                    Some(
1655                                        protobuf::csv_scan_exec_node::OptionalEscape::Escape(
1656                                            byte_to_string(escape, "escape")?,
1657                                        ),
1658                                    )
1659                                } else {
1660                                    None
1661                                },
1662                                optional_comment: if let Some(comment) =
1663                                    csv_config.comment()
1664                                {
1665                                    Some(protobuf::csv_scan_exec_node::OptionalComment::Comment(
1666                                        byte_to_string(comment, "comment")?,
1667                                    ))
1668                                } else {
1669                                    None
1670                                },
1671                                newlines_in_values: maybe_csv.newlines_in_values(),
1672                            },
1673                        )),
1674                    });
1675                }
1676            }
1677        }
1678
1679        #[cfg(feature = "parquet")]
1680        if let Some(exec) = plan.downcast_ref::<DataSourceExec>() {
1681            let data_source_exec = exec.data_source();
1682            if let Some(maybe_parquet) =
1683                data_source_exec.as_any().downcast_ref::<FileScanConfig>()
1684            {
1685                let source = maybe_parquet.file_source();
1686                if let Some(conf) = source.as_any().downcast_ref::<ParquetSource>() {
1687                    let predicate = conf
1688                        .predicate()
1689                        .map(|pred| serialize_physical_expr(pred, extension_codec))
1690                        .transpose()?;
1691                    return Ok(protobuf::PhysicalPlanNode {
1692                        physical_plan_type: Some(PhysicalPlanType::ParquetScan(
1693                            protobuf::ParquetScanExecNode {
1694                                base_conf: Some(serialize_file_scan_config(
1695                                    maybe_parquet,
1696                                    extension_codec,
1697                                )?),
1698                                predicate,
1699                                parquet_options: Some(
1700                                    conf.table_parquet_options().try_into()?,
1701                                ),
1702                            },
1703                        )),
1704                    });
1705                }
1706            }
1707        }
1708
1709        if let Some(data_source_exec) = plan.downcast_ref::<DataSourceExec>() {
1710            let data_source = data_source_exec.data_source();
1711            if let Some(maybe_avro) =
1712                data_source.as_any().downcast_ref::<FileScanConfig>()
1713            {
1714                let source = maybe_avro.file_source();
1715                if source.as_any().downcast_ref::<AvroSource>().is_some() {
1716                    return Ok(protobuf::PhysicalPlanNode {
1717                        physical_plan_type: Some(PhysicalPlanType::AvroScan(
1718                            protobuf::AvroScanExecNode {
1719                                base_conf: Some(serialize_file_scan_config(
1720                                    maybe_avro,
1721                                    extension_codec,
1722                                )?),
1723                            },
1724                        )),
1725                    });
1726                }
1727            }
1728        }
1729
1730        if let Some(exec) = plan.downcast_ref::<CoalescePartitionsExec>() {
1731            let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1732                exec.input().to_owned(),
1733                extension_codec,
1734            )?;
1735            return Ok(protobuf::PhysicalPlanNode {
1736                physical_plan_type: Some(PhysicalPlanType::Merge(Box::new(
1737                    protobuf::CoalescePartitionsExecNode {
1738                        input: Some(Box::new(input)),
1739                    },
1740                ))),
1741            });
1742        }
1743
1744        if let Some(exec) = plan.downcast_ref::<RepartitionExec>() {
1745            let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1746                exec.input().to_owned(),
1747                extension_codec,
1748            )?;
1749
1750            let pb_partitioning =
1751                serialize_partitioning(exec.partitioning(), extension_codec)?;
1752
1753            return Ok(protobuf::PhysicalPlanNode {
1754                physical_plan_type: Some(PhysicalPlanType::Repartition(Box::new(
1755                    protobuf::RepartitionExecNode {
1756                        input: Some(Box::new(input)),
1757                        partitioning: Some(pb_partitioning),
1758                    },
1759                ))),
1760            });
1761        }
1762
1763        if let Some(exec) = plan.downcast_ref::<SortExec>() {
1764            let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1765                exec.input().to_owned(),
1766                extension_codec,
1767            )?;
1768            let expr = exec
1769                .expr()
1770                .iter()
1771                .map(|expr| {
1772                    let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
1773                        expr: Some(Box::new(serialize_physical_expr(
1774                            &expr.expr,
1775                            extension_codec,
1776                        )?)),
1777                        asc: !expr.options.descending,
1778                        nulls_first: expr.options.nulls_first,
1779                    });
1780                    Ok(protobuf::PhysicalExprNode {
1781                        expr_type: Some(ExprType::Sort(sort_expr)),
1782                    })
1783                })
1784                .collect::<Result<Vec<_>>>()?;
1785            return Ok(protobuf::PhysicalPlanNode {
1786                physical_plan_type: Some(PhysicalPlanType::Sort(Box::new(
1787                    protobuf::SortExecNode {
1788                        input: Some(Box::new(input)),
1789                        expr,
1790                        fetch: match exec.fetch() {
1791                            Some(n) => n as i64,
1792                            _ => -1,
1793                        },
1794                        preserve_partitioning: exec.preserve_partitioning(),
1795                    },
1796                ))),
1797            });
1798        }
1799
1800        if let Some(union) = plan.downcast_ref::<UnionExec>() {
1801            let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
1802            for input in union.inputs() {
1803                inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
1804                    input.to_owned(),
1805                    extension_codec,
1806                )?);
1807            }
1808            return Ok(protobuf::PhysicalPlanNode {
1809                physical_plan_type: Some(PhysicalPlanType::Union(
1810                    protobuf::UnionExecNode { inputs },
1811                )),
1812            });
1813        }
1814
1815        if let Some(interleave) = plan.downcast_ref::<InterleaveExec>() {
1816            let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
1817            for input in interleave.inputs() {
1818                inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
1819                    input.to_owned(),
1820                    extension_codec,
1821                )?);
1822            }
1823            return Ok(protobuf::PhysicalPlanNode {
1824                physical_plan_type: Some(PhysicalPlanType::Interleave(
1825                    protobuf::InterleaveExecNode { inputs },
1826                )),
1827            });
1828        }
1829
1830        if let Some(exec) = plan.downcast_ref::<SortPreservingMergeExec>() {
1831            let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1832                exec.input().to_owned(),
1833                extension_codec,
1834            )?;
1835            let expr = exec
1836                .expr()
1837                .iter()
1838                .map(|expr| {
1839                    let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
1840                        expr: Some(Box::new(serialize_physical_expr(
1841                            &expr.expr,
1842                            extension_codec,
1843                        )?)),
1844                        asc: !expr.options.descending,
1845                        nulls_first: expr.options.nulls_first,
1846                    });
1847                    Ok(protobuf::PhysicalExprNode {
1848                        expr_type: Some(ExprType::Sort(sort_expr)),
1849                    })
1850                })
1851                .collect::<Result<Vec<_>>>()?;
1852            return Ok(protobuf::PhysicalPlanNode {
1853                physical_plan_type: Some(PhysicalPlanType::SortPreservingMerge(
1854                    Box::new(protobuf::SortPreservingMergeExecNode {
1855                        input: Some(Box::new(input)),
1856                        expr,
1857                        fetch: exec.fetch().map(|f| f as i64).unwrap_or(-1),
1858                    }),
1859                )),
1860            });
1861        }
1862
1863        if let Some(exec) = plan.downcast_ref::<NestedLoopJoinExec>() {
1864            let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
1865                exec.left().to_owned(),
1866                extension_codec,
1867            )?;
1868            let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
1869                exec.right().to_owned(),
1870                extension_codec,
1871            )?;
1872
1873            let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
1874            let filter = exec
1875                .filter()
1876                .as_ref()
1877                .map(|f| {
1878                    let expression =
1879                        serialize_physical_expr(f.expression(), extension_codec)?;
1880                    let column_indices = f
1881                        .column_indices()
1882                        .iter()
1883                        .map(|i| {
1884                            let side: protobuf::JoinSide = i.side.to_owned().into();
1885                            protobuf::ColumnIndex {
1886                                index: i.index as u32,
1887                                side: side.into(),
1888                            }
1889                        })
1890                        .collect();
1891                    let schema = f.schema().as_ref().try_into()?;
1892                    Ok(protobuf::JoinFilter {
1893                        expression: Some(expression),
1894                        column_indices,
1895                        schema: Some(schema),
1896                    })
1897                })
1898                .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
1899
1900            return Ok(protobuf::PhysicalPlanNode {
1901                physical_plan_type: Some(PhysicalPlanType::NestedLoopJoin(Box::new(
1902                    protobuf::NestedLoopJoinExecNode {
1903                        left: Some(Box::new(left)),
1904                        right: Some(Box::new(right)),
1905                        join_type: join_type.into(),
1906                        filter,
1907                        projection: exec.projection().map_or_else(Vec::new, |v| {
1908                            v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
1909                        }),
1910                    },
1911                ))),
1912            });
1913        }
1914
1915        if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
1916            let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1917                exec.input().to_owned(),
1918                extension_codec,
1919            )?;
1920
1921            let window_expr = exec
1922                .window_expr()
1923                .iter()
1924                .map(|e| serialize_physical_window_expr(e, extension_codec))
1925                .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
1926
1927            let partition_keys = exec
1928                .partition_keys()
1929                .iter()
1930                .map(|e| serialize_physical_expr(e, extension_codec))
1931                .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
1932
1933            return Ok(protobuf::PhysicalPlanNode {
1934                physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
1935                    protobuf::WindowAggExecNode {
1936                        input: Some(Box::new(input)),
1937                        window_expr,
1938                        partition_keys,
1939                        input_order_mode: None,
1940                    },
1941                ))),
1942            });
1943        }
1944
1945        if let Some(exec) = plan.downcast_ref::<BoundedWindowAggExec>() {
1946            let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1947                exec.input().to_owned(),
1948                extension_codec,
1949            )?;
1950
1951            let window_expr = exec
1952                .window_expr()
1953                .iter()
1954                .map(|e| serialize_physical_window_expr(e, extension_codec))
1955                .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
1956
1957            let partition_keys = exec
1958                .partition_keys()
1959                .iter()
1960                .map(|e| serialize_physical_expr(e, extension_codec))
1961                .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
1962
1963            let input_order_mode = match &exec.input_order_mode {
1964                InputOrderMode::Linear => window_agg_exec_node::InputOrderMode::Linear(
1965                    protobuf::EmptyMessage {},
1966                ),
1967                InputOrderMode::PartiallySorted(columns) => {
1968                    window_agg_exec_node::InputOrderMode::PartiallySorted(
1969                        protobuf::PartiallySortedInputOrderMode {
1970                            columns: columns.iter().map(|c| *c as u64).collect(),
1971                        },
1972                    )
1973                }
1974                InputOrderMode::Sorted => window_agg_exec_node::InputOrderMode::Sorted(
1975                    protobuf::EmptyMessage {},
1976                ),
1977            };
1978
1979            return Ok(protobuf::PhysicalPlanNode {
1980                physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
1981                    protobuf::WindowAggExecNode {
1982                        input: Some(Box::new(input)),
1983                        window_expr,
1984                        partition_keys,
1985                        input_order_mode: Some(input_order_mode),
1986                    },
1987                ))),
1988            });
1989        }
1990
1991        if let Some(exec) = plan.downcast_ref::<DataSinkExec>() {
1992            let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
1993                exec.input().to_owned(),
1994                extension_codec,
1995            )?;
1996            let sort_order = match exec.sort_order() {
1997                Some(requirements) => {
1998                    let expr = requirements
1999                        .iter()
2000                        .map(|requirement| {
2001                            let expr: PhysicalSortExpr = requirement.to_owned().into();
2002                            let sort_expr = protobuf::PhysicalSortExprNode {
2003                                expr: Some(Box::new(serialize_physical_expr(
2004                                    &expr.expr,
2005                                    extension_codec,
2006                                )?)),
2007                                asc: !expr.options.descending,
2008                                nulls_first: expr.options.nulls_first,
2009                            };
2010                            Ok(sort_expr)
2011                        })
2012                        .collect::<Result<Vec<_>>>()?;
2013                    Some(protobuf::PhysicalSortExprNodeCollection {
2014                        physical_sort_expr_nodes: expr,
2015                    })
2016                }
2017                None => None,
2018            };
2019
2020            if let Some(sink) = exec.sink().as_any().downcast_ref::<JsonSink>() {
2021                return Ok(protobuf::PhysicalPlanNode {
2022                    physical_plan_type: Some(PhysicalPlanType::JsonSink(Box::new(
2023                        protobuf::JsonSinkExecNode {
2024                            input: Some(Box::new(input)),
2025                            sink: Some(sink.try_into()?),
2026                            sink_schema: Some(exec.schema().as_ref().try_into()?),
2027                            sort_order,
2028                        },
2029                    ))),
2030                });
2031            }
2032
2033            if let Some(sink) = exec.sink().as_any().downcast_ref::<CsvSink>() {
2034                return Ok(protobuf::PhysicalPlanNode {
2035                    physical_plan_type: Some(PhysicalPlanType::CsvSink(Box::new(
2036                        protobuf::CsvSinkExecNode {
2037                            input: Some(Box::new(input)),
2038                            sink: Some(sink.try_into()?),
2039                            sink_schema: Some(exec.schema().as_ref().try_into()?),
2040                            sort_order,
2041                        },
2042                    ))),
2043                });
2044            }
2045
2046            #[cfg(feature = "parquet")]
2047            if let Some(sink) = exec.sink().as_any().downcast_ref::<ParquetSink>() {
2048                return Ok(protobuf::PhysicalPlanNode {
2049                    physical_plan_type: Some(PhysicalPlanType::ParquetSink(Box::new(
2050                        protobuf::ParquetSinkExecNode {
2051                            input: Some(Box::new(input)),
2052                            sink: Some(sink.try_into()?),
2053                            sink_schema: Some(exec.schema().as_ref().try_into()?),
2054                            sort_order,
2055                        },
2056                    ))),
2057                });
2058            }
2059
2060            // If unknown DataSink then let extension handle it
2061        }
2062
2063        if let Some(exec) = plan.downcast_ref::<UnnestExec>() {
2064            let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
2065                exec.input().to_owned(),
2066                extension_codec,
2067            )?;
2068
2069            return Ok(protobuf::PhysicalPlanNode {
2070                physical_plan_type: Some(PhysicalPlanType::Unnest(Box::new(
2071                    protobuf::UnnestExecNode {
2072                        input: Some(Box::new(input)),
2073                        schema: Some(exec.schema().try_into()?),
2074                        list_type_columns: exec
2075                            .list_column_indices()
2076                            .iter()
2077                            .map(|c| ProtoListUnnest {
2078                                index_in_input_schema: c.index_in_input_schema as _,
2079                                depth: c.depth as _,
2080                            })
2081                            .collect(),
2082                        struct_type_columns: exec
2083                            .struct_column_indices()
2084                            .iter()
2085                            .map(|c| *c as _)
2086                            .collect(),
2087                        options: Some(exec.options().into()),
2088                    },
2089                ))),
2090            });
2091        }
2092
2093        let mut buf: Vec<u8> = vec![];
2094        match extension_codec.try_encode(Arc::clone(&plan_clone), &mut buf) {
2095            Ok(_) => {
2096                let inputs: Vec<protobuf::PhysicalPlanNode> = plan_clone
2097                    .children()
2098                    .into_iter()
2099                    .cloned()
2100                    .map(|i| {
2101                        protobuf::PhysicalPlanNode::try_from_physical_plan(
2102                            i,
2103                            extension_codec,
2104                        )
2105                    })
2106                    .collect::<Result<_>>()?;
2107
2108                Ok(protobuf::PhysicalPlanNode {
2109                    physical_plan_type: Some(PhysicalPlanType::Extension(
2110                        protobuf::PhysicalExtensionNode { node: buf, inputs },
2111                    )),
2112                })
2113            }
2114            Err(e) => internal_err!(
2115                "Unsupported plan and extension codec failed with [{e}]. Plan: {plan_clone:?}"
2116            ),
2117        }
2118    }
2119}
2120
2121pub trait AsExecutionPlan: Debug + Send + Sync + Clone {
2122    fn try_decode(buf: &[u8]) -> Result<Self>
2123    where
2124        Self: Sized;
2125
2126    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
2127    where
2128        B: BufMut,
2129        Self: Sized;
2130
2131    fn try_into_physical_plan(
2132        &self,
2133        registry: &dyn FunctionRegistry,
2134        runtime: &RuntimeEnv,
2135        extension_codec: &dyn PhysicalExtensionCodec,
2136    ) -> Result<Arc<dyn ExecutionPlan>>;
2137
2138    fn try_from_physical_plan(
2139        plan: Arc<dyn ExecutionPlan>,
2140        extension_codec: &dyn PhysicalExtensionCodec,
2141    ) -> Result<Self>
2142    where
2143        Self: Sized;
2144}
2145
2146pub trait PhysicalExtensionCodec: Debug + Send + Sync {
2147    fn try_decode(
2148        &self,
2149        buf: &[u8],
2150        inputs: &[Arc<dyn ExecutionPlan>],
2151        registry: &dyn FunctionRegistry,
2152    ) -> Result<Arc<dyn ExecutionPlan>>;
2153
2154    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()>;
2155
2156    fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
2157        not_impl_err!("PhysicalExtensionCodec is not provided for scalar function {name}")
2158    }
2159
2160    fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
2161        Ok(())
2162    }
2163
2164    fn try_decode_expr(
2165        &self,
2166        _buf: &[u8],
2167        _inputs: &[Arc<dyn PhysicalExpr>],
2168    ) -> Result<Arc<dyn PhysicalExpr>> {
2169        not_impl_err!("PhysicalExtensionCodec is not provided")
2170    }
2171
2172    fn try_encode_expr(
2173        &self,
2174        _node: &Arc<dyn PhysicalExpr>,
2175        _buf: &mut Vec<u8>,
2176    ) -> Result<()> {
2177        not_impl_err!("PhysicalExtensionCodec is not provided")
2178    }
2179
2180    fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
2181        not_impl_err!(
2182            "PhysicalExtensionCodec is not provided for aggregate function {name}"
2183        )
2184    }
2185
2186    fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
2187        Ok(())
2188    }
2189
2190    fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
2191        not_impl_err!("PhysicalExtensionCodec is not provided for window function {name}")
2192    }
2193
2194    fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
2195        Ok(())
2196    }
2197}
2198
2199#[derive(Debug)]
2200pub struct DefaultPhysicalExtensionCodec {}
2201
2202impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec {
2203    fn try_decode(
2204        &self,
2205        _buf: &[u8],
2206        _inputs: &[Arc<dyn ExecutionPlan>],
2207        _registry: &dyn FunctionRegistry,
2208    ) -> Result<Arc<dyn ExecutionPlan>> {
2209        not_impl_err!("PhysicalExtensionCodec is not provided")
2210    }
2211
2212    fn try_encode(
2213        &self,
2214        _node: Arc<dyn ExecutionPlan>,
2215        _buf: &mut Vec<u8>,
2216    ) -> Result<()> {
2217        not_impl_err!("PhysicalExtensionCodec is not provided")
2218    }
2219}
2220
2221fn into_physical_plan(
2222    node: &Option<Box<protobuf::PhysicalPlanNode>>,
2223    registry: &dyn FunctionRegistry,
2224    runtime: &RuntimeEnv,
2225    extension_codec: &dyn PhysicalExtensionCodec,
2226) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
2227    if let Some(field) = node {
2228        field.try_into_physical_plan(registry, runtime, extension_codec)
2229    } else {
2230        Err(proto_error("Missing required field in protobuf"))
2231    }
2232}