datafusion_proto/logical_plan/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::fmt::Debug;
20use std::sync::Arc;
21
22use crate::protobuf::logical_plan_node::LogicalPlanType::CustomScan;
23use crate::protobuf::{
24    dml_node, ColumnUnnestListItem, ColumnUnnestListRecursion, CteWorkTableScanNode,
25    CustomTableScanNode, DmlNode, SortExprNodeCollection,
26};
27use crate::{
28    convert_required, into_required,
29    protobuf::{
30        self, listing_table_scan_node::FileFormatType,
31        logical_plan_node::LogicalPlanType, LogicalExtensionNode, LogicalPlanNode,
32    },
33};
34
35use crate::protobuf::{proto_error, ToProtoError};
36use arrow::datatypes::{DataType, Field, Schema, SchemaBuilder, SchemaRef};
37use datafusion_catalog::cte_worktable::CteWorkTable;
38use datafusion_common::file_options::file_type::FileType;
39use datafusion_common::{
40    context, internal_datafusion_err, internal_err, not_impl_err, plan_err, Result,
41    TableReference, ToDFSchema,
42};
43use datafusion_datasource::file_format::FileFormat;
44use datafusion_datasource::file_format::{
45    file_type_to_format, format_as_file_type, FileFormatFactory,
46};
47use datafusion_datasource_arrow::file_format::ArrowFormat;
48#[cfg(feature = "avro")]
49use datafusion_datasource_avro::file_format::AvroFormat;
50use datafusion_datasource_csv::file_format::CsvFormat;
51use datafusion_datasource_json::file_format::JsonFormat as OtherNdJsonFormat;
52#[cfg(feature = "parquet")]
53use datafusion_datasource_parquet::file_format::ParquetFormat;
54use datafusion_expr::{
55    dml,
56    logical_plan::{
57        builder::project, Aggregate, CreateCatalog, CreateCatalogSchema,
58        CreateExternalTable, CreateView, DdlStatement, Distinct, EmptyRelation,
59        Extension, Join, JoinConstraint, Prepare, Projection, Repartition, Sort,
60        SubqueryAlias, TableScan, Values, Window,
61    },
62    DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder, ScalarUDF, SortExpr,
63    Statement, WindowUDF,
64};
65use datafusion_expr::{
66    AggregateUDF, DmlStatement, FetchType, RecursiveQuery, SkipType, TableSource, Unnest,
67};
68
69use self::to_proto::{serialize_expr, serialize_exprs};
70use crate::logical_plan::to_proto::serialize_sorts;
71use datafusion_catalog::default_table_source::{provider_as_source, source_as_provider};
72use datafusion_catalog::view::ViewTable;
73use datafusion_catalog::TableProvider;
74use datafusion_catalog_listing::{ListingOptions, ListingTable, ListingTableConfig};
75use datafusion_datasource::ListingTableUrl;
76use datafusion_execution::TaskContext;
77use prost::bytes::BufMut;
78use prost::Message;
79
80pub mod file_formats;
81pub mod from_proto;
82pub mod to_proto;
83
84pub trait AsLogicalPlan: Debug + Send + Sync + Clone {
85    fn try_decode(buf: &[u8]) -> Result<Self>
86    where
87        Self: Sized;
88
89    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
90    where
91        B: BufMut,
92        Self: Sized;
93
94    fn try_into_logical_plan(
95        &self,
96        ctx: &TaskContext,
97        extension_codec: &dyn LogicalExtensionCodec,
98    ) -> Result<LogicalPlan>;
99
100    fn try_from_logical_plan(
101        plan: &LogicalPlan,
102        extension_codec: &dyn LogicalExtensionCodec,
103    ) -> Result<Self>
104    where
105        Self: Sized;
106}
107
108pub trait LogicalExtensionCodec: Debug + Send + Sync {
109    fn try_decode(
110        &self,
111        buf: &[u8],
112        inputs: &[LogicalPlan],
113        ctx: &TaskContext,
114    ) -> Result<Extension>;
115
116    fn try_encode(&self, node: &Extension, buf: &mut Vec<u8>) -> Result<()>;
117
118    fn try_decode_table_provider(
119        &self,
120        buf: &[u8],
121        table_ref: &TableReference,
122        schema: SchemaRef,
123        ctx: &TaskContext,
124    ) -> Result<Arc<dyn TableProvider>>;
125
126    fn try_encode_table_provider(
127        &self,
128        table_ref: &TableReference,
129        node: Arc<dyn TableProvider>,
130        buf: &mut Vec<u8>,
131    ) -> Result<()>;
132
133    fn try_decode_file_format(
134        &self,
135        _buf: &[u8],
136        _ctx: &TaskContext,
137    ) -> Result<Arc<dyn FileFormatFactory>> {
138        not_impl_err!("LogicalExtensionCodec is not provided for file format")
139    }
140
141    fn try_encode_file_format(
142        &self,
143        _buf: &mut Vec<u8>,
144        _node: Arc<dyn FileFormatFactory>,
145    ) -> Result<()> {
146        Ok(())
147    }
148
149    fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
150        not_impl_err!("LogicalExtensionCodec is not provided for scalar function {name}")
151    }
152
153    fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
154        Ok(())
155    }
156
157    fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
158        not_impl_err!(
159            "LogicalExtensionCodec is not provided for aggregate function {name}"
160        )
161    }
162
163    fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
164        Ok(())
165    }
166
167    fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
168        not_impl_err!("LogicalExtensionCodec is not provided for window function {name}")
169    }
170
171    fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
172        Ok(())
173    }
174}
175
176#[derive(Debug, Clone)]
177pub struct DefaultLogicalExtensionCodec {}
178
179impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
180    fn try_decode(
181        &self,
182        _buf: &[u8],
183        _inputs: &[LogicalPlan],
184        _ctx: &TaskContext,
185    ) -> Result<Extension> {
186        not_impl_err!("LogicalExtensionCodec is not provided")
187    }
188
189    fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<()> {
190        not_impl_err!("LogicalExtensionCodec is not provided")
191    }
192
193    fn try_decode_table_provider(
194        &self,
195        _buf: &[u8],
196        _table_ref: &TableReference,
197        _schema: SchemaRef,
198        _ctx: &TaskContext,
199    ) -> Result<Arc<dyn TableProvider>> {
200        not_impl_err!("LogicalExtensionCodec is not provided")
201    }
202
203    fn try_encode_table_provider(
204        &self,
205        _table_ref: &TableReference,
206        _node: Arc<dyn TableProvider>,
207        _buf: &mut Vec<u8>,
208    ) -> Result<()> {
209        not_impl_err!("LogicalExtensionCodec is not provided")
210    }
211}
212
213#[macro_export]
214macro_rules! into_logical_plan {
215    ($PB:expr, $CTX:expr, $CODEC:expr) => {{
216        if let Some(field) = $PB.as_ref() {
217            field.as_ref().try_into_logical_plan($CTX, $CODEC)
218        } else {
219            Err(proto_error("Missing required field in protobuf"))
220        }
221    }};
222}
223
224fn from_table_reference(
225    table_ref: Option<&protobuf::TableReference>,
226    error_context: &str,
227) -> Result<TableReference> {
228    let table_ref = table_ref.ok_or_else(|| {
229        internal_datafusion_err!(
230            "Protobuf deserialization error, {error_context} was missing required field name."
231        )
232    })?;
233
234    Ok(table_ref.clone().try_into()?)
235}
236
237/// Converts [LogicalPlan::TableScan] to [TableSource]
238/// method to be used to deserialize nodes
239/// serialized by [from_table_source]
240fn to_table_source(
241    node: &Option<Box<LogicalPlanNode>>,
242    ctx: &TaskContext,
243    extension_codec: &dyn LogicalExtensionCodec,
244) -> Result<Arc<dyn TableSource>> {
245    if let Some(node) = node {
246        match node.try_into_logical_plan(ctx, extension_codec)? {
247            LogicalPlan::TableScan(TableScan { source, .. }) => Ok(source),
248            _ => plan_err!("expected TableScan node"),
249        }
250    } else {
251        plan_err!("LogicalPlanNode should be provided")
252    }
253}
254
255/// converts [TableSource] to [LogicalPlan::TableScan]
256/// using [LogicalPlan::TableScan] was the best approach to
257/// serialize [TableSource] to [LogicalPlan::TableScan]
258fn from_table_source(
259    table_name: TableReference,
260    target: Arc<dyn TableSource>,
261    extension_codec: &dyn LogicalExtensionCodec,
262) -> Result<LogicalPlanNode> {
263    let projected_schema = target.schema().to_dfschema_ref()?;
264    let r = LogicalPlan::TableScan(TableScan {
265        table_name,
266        source: target,
267        projection: None,
268        projected_schema,
269        filters: vec![],
270        fetch: None,
271    });
272
273    LogicalPlanNode::try_from_logical_plan(&r, extension_codec)
274}
275
276impl AsLogicalPlan for LogicalPlanNode {
277    fn try_decode(buf: &[u8]) -> Result<Self>
278    where
279        Self: Sized,
280    {
281        LogicalPlanNode::decode(buf)
282            .map_err(|e| internal_datafusion_err!("failed to decode logical plan: {e:?}"))
283    }
284
285    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
286    where
287        B: BufMut,
288        Self: Sized,
289    {
290        self.encode(buf)
291            .map_err(|e| internal_datafusion_err!("failed to encode logical plan: {e:?}"))
292    }
293
294    fn try_into_logical_plan(
295        &self,
296        ctx: &TaskContext,
297        extension_codec: &dyn LogicalExtensionCodec,
298    ) -> Result<LogicalPlan> {
299        let plan = self.logical_plan_type.as_ref().ok_or_else(|| {
300            proto_error(format!(
301                "logical_plan::from_proto() Unsupported logical plan '{self:?}'"
302            ))
303        })?;
304        match plan {
305            LogicalPlanType::Values(values) => {
306                let n_cols = values.n_cols as usize;
307                let values: Vec<Vec<Expr>> = if values.values_list.is_empty() {
308                    Ok(Vec::new())
309                } else if values.values_list.len() % n_cols != 0 {
310                    internal_err!(
311                        "Invalid values list length, expect {} to be divisible by {}",
312                        values.values_list.len(),
313                        n_cols
314                    )
315                } else {
316                    values
317                        .values_list
318                        .chunks_exact(n_cols)
319                        .map(|r| from_proto::parse_exprs(r, ctx, extension_codec))
320                        .collect::<Result<Vec<_>, _>>()
321                        .map_err(|e| e.into())
322                }?;
323
324                LogicalPlanBuilder::values(values)?.build()
325            }
326            LogicalPlanType::Projection(projection) => {
327                let input: LogicalPlan =
328                    into_logical_plan!(projection.input, ctx, extension_codec)?;
329                let expr: Vec<Expr> =
330                    from_proto::parse_exprs(&projection.expr, ctx, extension_codec)?;
331
332                let new_proj = project(input, expr)?;
333                match projection.optional_alias.as_ref() {
334                    Some(a) => match a {
335                        protobuf::projection_node::OptionalAlias::Alias(alias) => {
336                            Ok(LogicalPlan::SubqueryAlias(SubqueryAlias::try_new(
337                                Arc::new(new_proj),
338                                alias.clone(),
339                            )?))
340                        }
341                    },
342                    _ => Ok(new_proj),
343                }
344            }
345            LogicalPlanType::Selection(selection) => {
346                let input: LogicalPlan =
347                    into_logical_plan!(selection.input, ctx, extension_codec)?;
348                let expr: Expr = selection
349                    .expr
350                    .as_ref()
351                    .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
352                    .transpose()?
353                    .ok_or_else(|| proto_error("expression required"))?;
354                LogicalPlanBuilder::from(input).filter(expr)?.build()
355            }
356            LogicalPlanType::Window(window) => {
357                let input: LogicalPlan =
358                    into_logical_plan!(window.input, ctx, extension_codec)?;
359                let window_expr =
360                    from_proto::parse_exprs(&window.window_expr, ctx, extension_codec)?;
361                LogicalPlanBuilder::from(input).window(window_expr)?.build()
362            }
363            LogicalPlanType::Aggregate(aggregate) => {
364                let input: LogicalPlan =
365                    into_logical_plan!(aggregate.input, ctx, extension_codec)?;
366                let group_expr =
367                    from_proto::parse_exprs(&aggregate.group_expr, ctx, extension_codec)?;
368                let aggr_expr =
369                    from_proto::parse_exprs(&aggregate.aggr_expr, ctx, extension_codec)?;
370                LogicalPlanBuilder::from(input)
371                    .aggregate(group_expr, aggr_expr)?
372                    .build()
373            }
374            LogicalPlanType::ListingScan(scan) => {
375                let schema: Schema = convert_required!(scan.schema)?;
376
377                let filters =
378                    from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
379
380                let mut all_sort_orders = vec![];
381                for order in &scan.file_sort_order {
382                    all_sort_orders.push(from_proto::parse_sorts(
383                        &order.sort_expr_nodes,
384                        ctx,
385                        extension_codec,
386                    )?)
387                }
388
389                let file_format: Arc<dyn FileFormat> =
390                    match scan.file_format_type.as_ref().ok_or_else(|| {
391                        proto_error(format!(
392                            "logical_plan::from_proto() Unsupported file format '{self:?}'"
393                        ))
394                    })? {
395                        #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
396                        FileFormatType::Parquet(protobuf::ParquetFormat {options}) => {
397                            #[cfg(feature = "parquet")]
398                            {
399                                let mut parquet = ParquetFormat::default();
400                                if let Some(options) = options {
401                                    parquet = parquet.with_options(options.try_into()?)
402                                }
403                                Arc::new(parquet)
404                            }
405                            #[cfg(not(feature = "parquet"))]
406                            panic!("Unable to process parquet file since `parquet` feature is not enabled");
407                        }
408                        FileFormatType::Csv(protobuf::CsvFormat {
409                            options
410                        }) => {
411                            let mut csv = CsvFormat::default();
412                            if let Some(options) = options {
413                                csv = csv.with_options(options.try_into()?)
414                            }
415                            Arc::new(csv)
416                        },
417                        FileFormatType::Json(protobuf::NdJsonFormat {
418                            options
419                        }) => {
420                            let mut json = OtherNdJsonFormat::default();
421                            if let Some(options) = options {
422                                json = json.with_options(options.try_into()?)
423                            }
424                            Arc::new(json)
425                        }
426                        #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
427                        FileFormatType::Avro(..) => {
428                            #[cfg(feature = "avro")]
429                            {
430                                Arc::new(AvroFormat)
431                            }
432                            #[cfg(not(feature = "avro"))]
433                            panic!("Unable to process avro file since `avro` feature is not enabled");
434                        }
435                        FileFormatType::Arrow(..) => {
436                            Arc::new(ArrowFormat)
437                        }
438                    };
439
440                let table_paths = &scan
441                    .paths
442                    .iter()
443                    .map(ListingTableUrl::parse)
444                    .collect::<Result<Vec<_>, _>>()?;
445
446                let partition_columns = scan
447                    .table_partition_cols
448                    .iter()
449                    .map(|col| {
450                        let Some(arrow_type) = col.arrow_type.as_ref() else {
451                            return Err(proto_error(
452                                "Missing Arrow type in partition columns",
453                            ));
454                        };
455                        let arrow_type = DataType::try_from(arrow_type).map_err(|e| {
456                            proto_error(format!("Received an unknown ArrowType: {e}"))
457                        })?;
458                        Ok((col.name.clone(), arrow_type))
459                    })
460                    .collect::<Result<Vec<_>>>()?;
461
462                let options = ListingOptions::new(file_format)
463                    .with_file_extension(&scan.file_extension)
464                    .with_table_partition_cols(partition_columns)
465                    .with_collect_stat(scan.collect_stat)
466                    .with_target_partitions(scan.target_partitions as usize)
467                    .with_file_sort_order(all_sort_orders);
468
469                let config =
470                    ListingTableConfig::new_with_multi_paths(table_paths.clone())
471                        .with_listing_options(options)
472                        .with_schema(Arc::new(schema));
473
474                let provider = ListingTable::try_new(config)?.with_cache(
475                    ctx.runtime_env().cache_manager.get_file_statistic_cache(),
476                );
477
478                let table_name =
479                    from_table_reference(scan.table_name.as_ref(), "ListingTableScan")?;
480
481                let mut projection = None;
482                if let Some(columns) = &scan.projection {
483                    let column_indices = columns
484                        .columns
485                        .iter()
486                        .map(|name| provider.schema().index_of(name))
487                        .collect::<Result<Vec<usize>, _>>()?;
488                    projection = Some(column_indices);
489                }
490
491                LogicalPlanBuilder::scan_with_filters(
492                    table_name,
493                    provider_as_source(Arc::new(provider)),
494                    projection,
495                    filters,
496                )?
497                .build()
498            }
499            LogicalPlanType::CustomScan(scan) => {
500                let schema: Schema = convert_required!(scan.schema)?;
501                let schema = Arc::new(schema);
502                let mut projection = None;
503                if let Some(columns) = &scan.projection {
504                    let column_indices = columns
505                        .columns
506                        .iter()
507                        .map(|name| schema.index_of(name))
508                        .collect::<Result<Vec<usize>, _>>()?;
509                    projection = Some(column_indices);
510                }
511
512                let filters =
513                    from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
514
515                let table_name =
516                    from_table_reference(scan.table_name.as_ref(), "CustomScan")?;
517
518                let provider = extension_codec.try_decode_table_provider(
519                    &scan.custom_table_data,
520                    &table_name,
521                    schema,
522                    ctx,
523                )?;
524
525                LogicalPlanBuilder::scan_with_filters(
526                    table_name,
527                    provider_as_source(provider),
528                    projection,
529                    filters,
530                )?
531                .build()
532            }
533            LogicalPlanType::Sort(sort) => {
534                let input: LogicalPlan =
535                    into_logical_plan!(sort.input, ctx, extension_codec)?;
536                let sort_expr: Vec<SortExpr> =
537                    from_proto::parse_sorts(&sort.expr, ctx, extension_codec)?;
538                let fetch: Option<usize> = sort.fetch.try_into().ok();
539                LogicalPlanBuilder::from(input)
540                    .sort_with_limit(sort_expr, fetch)?
541                    .build()
542            }
543            LogicalPlanType::Repartition(repartition) => {
544                use datafusion_expr::Partitioning;
545                let input: LogicalPlan =
546                    into_logical_plan!(repartition.input, ctx, extension_codec)?;
547                use protobuf::repartition_node::PartitionMethod;
548                let pb_partition_method = repartition.partition_method.as_ref().ok_or_else(|| {
549                    internal_datafusion_err!(
550                        "Protobuf deserialization error, RepartitionNode was missing required field 'partition_method'"
551                    )
552                })?;
553
554                let partitioning_scheme = match pb_partition_method {
555                    PartitionMethod::Hash(protobuf::HashRepartition {
556                        hash_expr: pb_hash_expr,
557                        partition_count,
558                    }) => Partitioning::Hash(
559                        from_proto::parse_exprs(pb_hash_expr, ctx, extension_codec)?,
560                        *partition_count as usize,
561                    ),
562                    PartitionMethod::RoundRobin(partition_count) => {
563                        Partitioning::RoundRobinBatch(*partition_count as usize)
564                    }
565                };
566
567                LogicalPlanBuilder::from(input)
568                    .repartition(partitioning_scheme)?
569                    .build()
570            }
571            LogicalPlanType::EmptyRelation(empty_relation) => {
572                LogicalPlanBuilder::empty(empty_relation.produce_one_row).build()
573            }
574            LogicalPlanType::CreateExternalTable(create_extern_table) => {
575                let pb_schema = (create_extern_table.schema.clone()).ok_or_else(|| {
576                    internal_datafusion_err!(
577                        "Protobuf deserialization error, CreateExternalTableNode was missing required field schema."
578                    )
579                })?;
580
581                let constraints = (create_extern_table.constraints.clone()).ok_or_else(|| {
582                    internal_datafusion_err!(
583                        "Protobuf deserialization error, CreateExternalTableNode was missing required table constraints."
584                    )
585                })?;
586                let definition = if !create_extern_table.definition.is_empty() {
587                    Some(create_extern_table.definition.clone())
588                } else {
589                    None
590                };
591
592                let mut order_exprs = vec![];
593                for expr in &create_extern_table.order_exprs {
594                    order_exprs.push(from_proto::parse_sorts(
595                        &expr.sort_expr_nodes,
596                        ctx,
597                        extension_codec,
598                    )?);
599                }
600
601                let mut column_defaults =
602                    HashMap::with_capacity(create_extern_table.column_defaults.len());
603                for (col_name, expr) in &create_extern_table.column_defaults {
604                    let expr = from_proto::parse_expr(expr, ctx, extension_codec)?;
605                    column_defaults.insert(col_name.clone(), expr);
606                }
607
608                Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
609                    CreateExternalTable {
610                        schema: pb_schema.try_into()?,
611                        name: from_table_reference(
612                            create_extern_table.name.as_ref(),
613                            "CreateExternalTable",
614                        )?,
615                        location: create_extern_table.location.clone(),
616                        file_type: create_extern_table.file_type.clone(),
617                        table_partition_cols: create_extern_table
618                            .table_partition_cols
619                            .clone(),
620                        order_exprs,
621                        if_not_exists: create_extern_table.if_not_exists,
622                        or_replace: create_extern_table.or_replace,
623                        temporary: create_extern_table.temporary,
624                        definition,
625                        unbounded: create_extern_table.unbounded,
626                        options: create_extern_table.options.clone(),
627                        constraints: constraints.into(),
628                        column_defaults,
629                    },
630                )))
631            }
632            LogicalPlanType::CreateView(create_view) => {
633                let plan = create_view
634                    .input.clone().ok_or_else(|| internal_datafusion_err!(
635                    "Protobuf deserialization error, CreateViewNode has invalid LogicalPlan input."
636                ))?
637                    .try_into_logical_plan(ctx, extension_codec)?;
638                let definition = if !create_view.definition.is_empty() {
639                    Some(create_view.definition.clone())
640                } else {
641                    None
642                };
643
644                Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
645                    name: from_table_reference(create_view.name.as_ref(), "CreateView")?,
646                    temporary: create_view.temporary,
647                    input: Arc::new(plan),
648                    or_replace: create_view.or_replace,
649                    definition,
650                })))
651            }
652            LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => {
653                let pb_schema = (create_catalog_schema.schema.clone()).ok_or_else(|| {
654                    internal_datafusion_err!(
655                        "Protobuf deserialization error, CreateCatalogSchemaNode was missing required field schema."
656                    )
657                })?;
658
659                Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
660                    CreateCatalogSchema {
661                        schema_name: create_catalog_schema.schema_name.clone(),
662                        if_not_exists: create_catalog_schema.if_not_exists,
663                        schema: pb_schema.try_into()?,
664                    },
665                )))
666            }
667            LogicalPlanType::CreateCatalog(create_catalog) => {
668                let pb_schema = (create_catalog.schema.clone()).ok_or_else(|| {
669                    internal_datafusion_err!(
670                        "Protobuf deserialization error, CreateCatalogNode was missing required field schema."
671                    )
672                })?;
673
674                Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
675                    CreateCatalog {
676                        catalog_name: create_catalog.catalog_name.clone(),
677                        if_not_exists: create_catalog.if_not_exists,
678                        schema: pb_schema.try_into()?,
679                    },
680                )))
681            }
682            LogicalPlanType::Analyze(analyze) => {
683                let input: LogicalPlan =
684                    into_logical_plan!(analyze.input, ctx, extension_codec)?;
685                LogicalPlanBuilder::from(input)
686                    .explain(analyze.verbose, true)?
687                    .build()
688            }
689            LogicalPlanType::Explain(explain) => {
690                let input: LogicalPlan =
691                    into_logical_plan!(explain.input, ctx, extension_codec)?;
692                LogicalPlanBuilder::from(input)
693                    .explain(explain.verbose, false)?
694                    .build()
695            }
696            LogicalPlanType::SubqueryAlias(aliased_relation) => {
697                let input: LogicalPlan =
698                    into_logical_plan!(aliased_relation.input, ctx, extension_codec)?;
699                let alias = from_table_reference(
700                    aliased_relation.alias.as_ref(),
701                    "SubqueryAlias",
702                )?;
703                LogicalPlanBuilder::from(input).alias(alias)?.build()
704            }
705            LogicalPlanType::Limit(limit) => {
706                let input: LogicalPlan =
707                    into_logical_plan!(limit.input, ctx, extension_codec)?;
708                let skip = limit.skip.max(0) as usize;
709
710                let fetch = if limit.fetch < 0 {
711                    None
712                } else {
713                    Some(limit.fetch as usize)
714                };
715
716                LogicalPlanBuilder::from(input).limit(skip, fetch)?.build()
717            }
718            LogicalPlanType::Join(join) => {
719                let left_keys: Vec<Expr> =
720                    from_proto::parse_exprs(&join.left_join_key, ctx, extension_codec)?;
721                let right_keys: Vec<Expr> =
722                    from_proto::parse_exprs(&join.right_join_key, ctx, extension_codec)?;
723                let join_type =
724                    protobuf::JoinType::try_from(join.join_type).map_err(|_| {
725                        proto_error(format!(
726                            "Received a JoinNode message with unknown JoinType {}",
727                            join.join_type
728                        ))
729                    })?;
730                let join_constraint = protobuf::JoinConstraint::try_from(
731                    join.join_constraint,
732                )
733                .map_err(|_| {
734                    proto_error(format!(
735                        "Received a JoinNode message with unknown JoinConstraint {}",
736                        join.join_constraint
737                    ))
738                })?;
739                let filter: Option<Expr> = join
740                    .filter
741                    .as_ref()
742                    .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
743                    .map_or(Ok(None), |v| v.map(Some))?;
744
745                let builder = LogicalPlanBuilder::from(into_logical_plan!(
746                    join.left,
747                    ctx,
748                    extension_codec
749                )?);
750                let builder = match join_constraint.into() {
751                    JoinConstraint::On => builder.join_with_expr_keys(
752                        into_logical_plan!(join.right, ctx, extension_codec)?,
753                        join_type.into(),
754                        (left_keys, right_keys),
755                        filter,
756                    )?,
757                    JoinConstraint::Using => {
758                        // The equijoin keys in using-join must be column.
759                        let using_keys = left_keys
760                            .into_iter()
761                            .map(|key| {
762                                key.try_as_col().cloned()
763                                    .ok_or_else(|| internal_datafusion_err!(
764                                        "Using join keys must be column references, got: {key:?}"
765                                    ))
766                            })
767                            .collect::<Result<Vec<_>, _>>()?;
768                        builder.join_using(
769                            into_logical_plan!(join.right, ctx, extension_codec)?,
770                            join_type.into(),
771                            using_keys,
772                        )?
773                    }
774                };
775
776                builder.build()
777            }
778            LogicalPlanType::Union(union) => {
779                if union.inputs.len() < 2 {
780                    return internal_err!(
781                        "Protobuf deserialization error, Union was require at least two input."
782                    );
783                }
784                let (first, rest) = union.inputs.split_first().unwrap();
785                let mut builder = LogicalPlanBuilder::from(
786                    first.try_into_logical_plan(ctx, extension_codec)?,
787                );
788
789                for i in rest {
790                    let plan = i.try_into_logical_plan(ctx, extension_codec)?;
791                    builder = builder.union(plan)?;
792                }
793                builder.build()
794            }
795            LogicalPlanType::CrossJoin(crossjoin) => {
796                let left = into_logical_plan!(crossjoin.left, ctx, extension_codec)?;
797                let right = into_logical_plan!(crossjoin.right, ctx, extension_codec)?;
798
799                LogicalPlanBuilder::from(left).cross_join(right)?.build()
800            }
801            LogicalPlanType::Extension(LogicalExtensionNode { node, inputs }) => {
802                let input_plans: Vec<LogicalPlan> = inputs
803                    .iter()
804                    .map(|i| i.try_into_logical_plan(ctx, extension_codec))
805                    .collect::<Result<_>>()?;
806
807                let extension_node =
808                    extension_codec.try_decode(node, &input_plans, ctx)?;
809                Ok(LogicalPlan::Extension(extension_node))
810            }
811            LogicalPlanType::Distinct(distinct) => {
812                let input: LogicalPlan =
813                    into_logical_plan!(distinct.input, ctx, extension_codec)?;
814                LogicalPlanBuilder::from(input).distinct()?.build()
815            }
816            LogicalPlanType::DistinctOn(distinct_on) => {
817                let input: LogicalPlan =
818                    into_logical_plan!(distinct_on.input, ctx, extension_codec)?;
819                let on_expr =
820                    from_proto::parse_exprs(&distinct_on.on_expr, ctx, extension_codec)?;
821                let select_expr = from_proto::parse_exprs(
822                    &distinct_on.select_expr,
823                    ctx,
824                    extension_codec,
825                )?;
826                let sort_expr = match distinct_on.sort_expr.len() {
827                    0 => None,
828                    _ => Some(from_proto::parse_sorts(
829                        &distinct_on.sort_expr,
830                        ctx,
831                        extension_codec,
832                    )?),
833                };
834                LogicalPlanBuilder::from(input)
835                    .distinct_on(on_expr, select_expr, sort_expr)?
836                    .build()
837            }
838            LogicalPlanType::ViewScan(scan) => {
839                let schema: Schema = convert_required!(scan.schema)?;
840
841                let mut projection = None;
842                if let Some(columns) = &scan.projection {
843                    let column_indices = columns
844                        .columns
845                        .iter()
846                        .map(|name| schema.index_of(name))
847                        .collect::<Result<Vec<usize>, _>>()?;
848                    projection = Some(column_indices);
849                }
850
851                let input: LogicalPlan =
852                    into_logical_plan!(scan.input, ctx, extension_codec)?;
853
854                let definition = if !scan.definition.is_empty() {
855                    Some(scan.definition.clone())
856                } else {
857                    None
858                };
859
860                let provider = ViewTable::new(input, definition);
861
862                let table_name =
863                    from_table_reference(scan.table_name.as_ref(), "ViewScan")?;
864
865                LogicalPlanBuilder::scan(
866                    table_name,
867                    provider_as_source(Arc::new(provider)),
868                    projection,
869                )?
870                .build()
871            }
872            LogicalPlanType::Prepare(prepare) => {
873                let input: LogicalPlan =
874                    into_logical_plan!(prepare.input, ctx, extension_codec)?;
875                let data_types: Vec<DataType> = prepare
876                    .data_types
877                    .iter()
878                    .map(DataType::try_from)
879                    .collect::<Result<_, _>>()?;
880                let fields: Vec<Field> = prepare
881                    .fields
882                    .iter()
883                    .map(Field::try_from)
884                    .collect::<Result<_, _>>()?;
885
886                // If the fields are empty this may have been generated by an
887                // earlier version of DataFusion, in which case the DataTypes
888                // can be used to construct the plan.
889                if fields.is_empty() {
890                    LogicalPlanBuilder::from(input)
891                        .prepare(
892                            prepare.name.clone(),
893                            data_types
894                                .into_iter()
895                                .map(|dt| Field::new("", dt, true).into())
896                                .collect(),
897                        )?
898                        .build()
899                } else {
900                    LogicalPlanBuilder::from(input)
901                        .prepare(
902                            prepare.name.clone(),
903                            fields.into_iter().map(|f| f.into()).collect(),
904                        )?
905                        .build()
906                }
907            }
908            LogicalPlanType::DropView(dropview) => {
909                Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
910                    name: from_table_reference(dropview.name.as_ref(), "DropView")?,
911                    if_exists: dropview.if_exists,
912                    schema: Arc::new(convert_required!(dropview.schema)?),
913                })))
914            }
915            LogicalPlanType::CopyTo(copy) => {
916                let input: LogicalPlan =
917                    into_logical_plan!(copy.input, ctx, extension_codec)?;
918
919                let file_type: Arc<dyn FileType> = format_as_file_type(
920                    extension_codec.try_decode_file_format(&copy.file_type, ctx)?,
921                );
922
923                Ok(LogicalPlan::Copy(dml::CopyTo::new(
924                    Arc::new(input),
925                    copy.output_url.clone(),
926                    copy.partition_by.clone(),
927                    file_type,
928                    Default::default(),
929                )))
930            }
931            LogicalPlanType::Unnest(unnest) => {
932                let input: LogicalPlan =
933                    into_logical_plan!(unnest.input, ctx, extension_codec)?;
934
935                LogicalPlanBuilder::from(input)
936                    .unnest_columns_with_options(
937                        unnest.exec_columns.iter().map(|c| c.into()).collect(),
938                        into_required!(unnest.options)?,
939                    )?
940                    .build()
941            }
942            LogicalPlanType::RecursiveQuery(recursive_query_node) => {
943                let static_term = recursive_query_node
944                    .static_term
945                    .as_ref()
946                    .ok_or_else(|| internal_datafusion_err!(
947                        "Protobuf deserialization error, RecursiveQueryNode was missing required field static_term."
948                    ))?
949                    .try_into_logical_plan(ctx, extension_codec)?;
950
951                let recursive_term = recursive_query_node
952                    .recursive_term
953                    .as_ref()
954                    .ok_or_else(|| internal_datafusion_err!(
955                        "Protobuf deserialization error, RecursiveQueryNode was missing required field recursive_term."
956                    ))?
957                    .try_into_logical_plan(ctx, extension_codec)?;
958
959                Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
960                    name: recursive_query_node.name.clone(),
961                    static_term: Arc::new(static_term),
962                    recursive_term: Arc::new(recursive_term),
963                    is_distinct: recursive_query_node.is_distinct,
964                }))
965            }
966            LogicalPlanType::CteWorkTableScan(cte_work_table_scan_node) => {
967                let CteWorkTableScanNode { name, schema } = cte_work_table_scan_node;
968                let schema = convert_required!(*schema)?;
969                let cte_work_table = CteWorkTable::new(name.as_str(), Arc::new(schema));
970                LogicalPlanBuilder::scan(
971                    name.as_str(),
972                    provider_as_source(Arc::new(cte_work_table)),
973                    None,
974                )?
975                .build()
976            }
977            LogicalPlanType::Dml(dml_node) => {
978                Ok(LogicalPlan::Dml(datafusion_expr::DmlStatement::new(
979                    from_table_reference(dml_node.table_name.as_ref(), "DML ")?,
980                    to_table_source(&dml_node.target, ctx, extension_codec)?,
981                    dml_node.dml_type().into(),
982                    Arc::new(into_logical_plan!(dml_node.input, ctx, extension_codec)?),
983                )))
984            }
985        }
986    }
987
988    fn try_from_logical_plan(
989        plan: &LogicalPlan,
990        extension_codec: &dyn LogicalExtensionCodec,
991    ) -> Result<Self>
992    where
993        Self: Sized,
994    {
995        match plan {
996            LogicalPlan::Values(Values { values, .. }) => {
997                let n_cols = if values.is_empty() {
998                    0
999                } else {
1000                    values[0].len()
1001                } as u64;
1002                let values_list =
1003                    serialize_exprs(values.iter().flatten(), extension_codec)?;
1004                Ok(LogicalPlanNode {
1005                    logical_plan_type: Some(LogicalPlanType::Values(
1006                        protobuf::ValuesNode {
1007                            n_cols,
1008                            values_list,
1009                        },
1010                    )),
1011                })
1012            }
1013            LogicalPlan::TableScan(TableScan {
1014                table_name,
1015                source,
1016                filters,
1017                projection,
1018                ..
1019            }) => {
1020                let provider = source_as_provider(source)?;
1021                let schema = provider.schema();
1022                let source = provider.as_any();
1023
1024                let projection = match projection {
1025                    None => None,
1026                    Some(columns) => {
1027                        let column_names = columns
1028                            .iter()
1029                            .map(|i| schema.field(*i).name().to_owned())
1030                            .collect();
1031                        Some(protobuf::ProjectionColumns {
1032                            columns: column_names,
1033                        })
1034                    }
1035                };
1036
1037                let filters: Vec<protobuf::LogicalExprNode> =
1038                    serialize_exprs(filters, extension_codec)?;
1039
1040                if let Some(listing_table) = source.downcast_ref::<ListingTable>() {
1041                    let any = listing_table.options().format.as_any();
1042                    let file_format_type = {
1043                        let mut maybe_some_type = None;
1044
1045                        #[cfg(feature = "parquet")]
1046                        if let Some(parquet) = any.downcast_ref::<ParquetFormat>() {
1047                            let options = parquet.options();
1048                            maybe_some_type =
1049                                Some(FileFormatType::Parquet(protobuf::ParquetFormat {
1050                                    options: Some(options.try_into()?),
1051                                }));
1052                        };
1053
1054                        if let Some(csv) = any.downcast_ref::<CsvFormat>() {
1055                            let options = csv.options();
1056                            maybe_some_type =
1057                                Some(FileFormatType::Csv(protobuf::CsvFormat {
1058                                    options: Some(options.try_into()?),
1059                                }));
1060                        }
1061
1062                        if let Some(json) = any.downcast_ref::<OtherNdJsonFormat>() {
1063                            let options = json.options();
1064                            maybe_some_type =
1065                                Some(FileFormatType::Json(protobuf::NdJsonFormat {
1066                                    options: Some(options.try_into()?),
1067                                }))
1068                        }
1069
1070                        #[cfg(feature = "avro")]
1071                        if any.is::<AvroFormat>() {
1072                            maybe_some_type =
1073                                Some(FileFormatType::Avro(protobuf::AvroFormat {}))
1074                        }
1075
1076                        if any.is::<ArrowFormat>() {
1077                            maybe_some_type =
1078                                Some(FileFormatType::Arrow(protobuf::ArrowFormat {}))
1079                        }
1080
1081                        if let Some(file_format_type) = maybe_some_type {
1082                            file_format_type
1083                        } else {
1084                            return Err(proto_error(format!(
1085                                "Error deserializing unknown file format: {:?}",
1086                                listing_table.options().format
1087                            )));
1088                        }
1089                    };
1090
1091                    let options = listing_table.options();
1092
1093                    let mut builder = SchemaBuilder::from(schema.as_ref());
1094                    for (idx, field) in schema.fields().iter().enumerate().rev() {
1095                        if options
1096                            .table_partition_cols
1097                            .iter()
1098                            .any(|(name, _)| name == field.name())
1099                        {
1100                            builder.remove(idx);
1101                        }
1102                    }
1103
1104                    let schema = builder.finish();
1105
1106                    let schema: protobuf::Schema = (&schema).try_into()?;
1107
1108                    let mut exprs_vec: Vec<SortExprNodeCollection> = vec![];
1109                    for order in &options.file_sort_order {
1110                        let expr_vec = SortExprNodeCollection {
1111                            sort_expr_nodes: serialize_sorts(order, extension_codec)?,
1112                        };
1113                        exprs_vec.push(expr_vec);
1114                    }
1115
1116                    let partition_columns = options
1117                        .table_partition_cols
1118                        .iter()
1119                        .map(|(name, arrow_type)| {
1120                            let arrow_type = protobuf::ArrowType::try_from(arrow_type)
1121                                .map_err(|e| {
1122                                    proto_error(format!(
1123                                        "Received an unknown ArrowType: {e}"
1124                                    ))
1125                                })?;
1126                            Ok(protobuf::PartitionColumn {
1127                                name: name.clone(),
1128                                arrow_type: Some(arrow_type),
1129                            })
1130                        })
1131                        .collect::<Result<Vec<_>>>()?;
1132
1133                    Ok(LogicalPlanNode {
1134                        logical_plan_type: Some(LogicalPlanType::ListingScan(
1135                            protobuf::ListingTableScanNode {
1136                                file_format_type: Some(file_format_type),
1137                                table_name: Some(table_name.clone().into()),
1138                                collect_stat: options.collect_stat,
1139                                file_extension: options.file_extension.clone(),
1140                                table_partition_cols: partition_columns,
1141                                paths: listing_table
1142                                    .table_paths()
1143                                    .iter()
1144                                    .map(|x| x.to_string())
1145                                    .collect(),
1146                                schema: Some(schema),
1147                                projection,
1148                                filters,
1149                                target_partitions: options.target_partitions as u32,
1150                                file_sort_order: exprs_vec,
1151                            },
1152                        )),
1153                    })
1154                } else if let Some(view_table) = source.downcast_ref::<ViewTable>() {
1155                    let schema: protobuf::Schema = schema.as_ref().try_into()?;
1156                    Ok(LogicalPlanNode {
1157                        logical_plan_type: Some(LogicalPlanType::ViewScan(Box::new(
1158                            protobuf::ViewTableScanNode {
1159                                table_name: Some(table_name.clone().into()),
1160                                input: Some(Box::new(
1161                                    LogicalPlanNode::try_from_logical_plan(
1162                                        view_table.logical_plan(),
1163                                        extension_codec,
1164                                    )?,
1165                                )),
1166                                schema: Some(schema),
1167                                projection,
1168                                definition: view_table
1169                                    .definition()
1170                                    .map(|s| s.to_string())
1171                                    .unwrap_or_default(),
1172                            },
1173                        ))),
1174                    })
1175                } else if let Some(cte_work_table) = source.downcast_ref::<CteWorkTable>()
1176                {
1177                    let name = cte_work_table.name().to_string();
1178                    let schema = cte_work_table.schema();
1179                    let schema: protobuf::Schema = schema.as_ref().try_into()?;
1180
1181                    Ok(LogicalPlanNode {
1182                        logical_plan_type: Some(LogicalPlanType::CteWorkTableScan(
1183                            protobuf::CteWorkTableScanNode {
1184                                name,
1185                                schema: Some(schema),
1186                            },
1187                        )),
1188                    })
1189                } else {
1190                    let schema: protobuf::Schema = schema.as_ref().try_into()?;
1191                    let mut bytes = vec![];
1192                    extension_codec
1193                        .try_encode_table_provider(table_name, provider, &mut bytes)
1194                        .map_err(|e| context!("Error serializing custom table", e))?;
1195                    let scan = CustomScan(CustomTableScanNode {
1196                        table_name: Some(table_name.clone().into()),
1197                        projection,
1198                        schema: Some(schema),
1199                        filters,
1200                        custom_table_data: bytes,
1201                    });
1202                    let node = LogicalPlanNode {
1203                        logical_plan_type: Some(scan),
1204                    };
1205                    Ok(node)
1206                }
1207            }
1208            LogicalPlan::Projection(Projection { expr, input, .. }) => {
1209                Ok(LogicalPlanNode {
1210                    logical_plan_type: Some(LogicalPlanType::Projection(Box::new(
1211                        protobuf::ProjectionNode {
1212                            input: Some(Box::new(
1213                                LogicalPlanNode::try_from_logical_plan(
1214                                    input.as_ref(),
1215                                    extension_codec,
1216                                )?,
1217                            )),
1218                            expr: serialize_exprs(expr, extension_codec)?,
1219                            optional_alias: None,
1220                        },
1221                    ))),
1222                })
1223            }
1224            LogicalPlan::Filter(filter) => {
1225                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1226                    filter.input.as_ref(),
1227                    extension_codec,
1228                )?;
1229                Ok(LogicalPlanNode {
1230                    logical_plan_type: Some(LogicalPlanType::Selection(Box::new(
1231                        protobuf::SelectionNode {
1232                            input: Some(Box::new(input)),
1233                            expr: Some(serialize_expr(
1234                                &filter.predicate,
1235                                extension_codec,
1236                            )?),
1237                        },
1238                    ))),
1239                })
1240            }
1241            LogicalPlan::Distinct(Distinct::All(input)) => {
1242                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1243                    input.as_ref(),
1244                    extension_codec,
1245                )?;
1246                Ok(LogicalPlanNode {
1247                    logical_plan_type: Some(LogicalPlanType::Distinct(Box::new(
1248                        protobuf::DistinctNode {
1249                            input: Some(Box::new(input)),
1250                        },
1251                    ))),
1252                })
1253            }
1254            LogicalPlan::Distinct(Distinct::On(DistinctOn {
1255                on_expr,
1256                select_expr,
1257                sort_expr,
1258                input,
1259                ..
1260            })) => {
1261                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1262                    input.as_ref(),
1263                    extension_codec,
1264                )?;
1265                let sort_expr = match sort_expr {
1266                    None => vec![],
1267                    Some(sort_expr) => serialize_sorts(sort_expr, extension_codec)?,
1268                };
1269                Ok(LogicalPlanNode {
1270                    logical_plan_type: Some(LogicalPlanType::DistinctOn(Box::new(
1271                        protobuf::DistinctOnNode {
1272                            on_expr: serialize_exprs(on_expr, extension_codec)?,
1273                            select_expr: serialize_exprs(select_expr, extension_codec)?,
1274                            sort_expr,
1275                            input: Some(Box::new(input)),
1276                        },
1277                    ))),
1278                })
1279            }
1280            LogicalPlan::Window(Window {
1281                input, window_expr, ..
1282            }) => {
1283                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1284                    input.as_ref(),
1285                    extension_codec,
1286                )?;
1287                Ok(LogicalPlanNode {
1288                    logical_plan_type: Some(LogicalPlanType::Window(Box::new(
1289                        protobuf::WindowNode {
1290                            input: Some(Box::new(input)),
1291                            window_expr: serialize_exprs(window_expr, extension_codec)?,
1292                        },
1293                    ))),
1294                })
1295            }
1296            LogicalPlan::Aggregate(Aggregate {
1297                group_expr,
1298                aggr_expr,
1299                input,
1300                ..
1301            }) => {
1302                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1303                    input.as_ref(),
1304                    extension_codec,
1305                )?;
1306                Ok(LogicalPlanNode {
1307                    logical_plan_type: Some(LogicalPlanType::Aggregate(Box::new(
1308                        protobuf::AggregateNode {
1309                            input: Some(Box::new(input)),
1310                            group_expr: serialize_exprs(group_expr, extension_codec)?,
1311                            aggr_expr: serialize_exprs(aggr_expr, extension_codec)?,
1312                        },
1313                    ))),
1314                })
1315            }
1316            LogicalPlan::Join(Join {
1317                left,
1318                right,
1319                on,
1320                filter,
1321                join_type,
1322                join_constraint,
1323                null_equality,
1324                ..
1325            }) => {
1326                let left: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1327                    left.as_ref(),
1328                    extension_codec,
1329                )?;
1330                let right: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1331                    right.as_ref(),
1332                    extension_codec,
1333                )?;
1334                let (left_join_key, right_join_key) = on
1335                    .iter()
1336                    .map(|(l, r)| {
1337                        Ok((
1338                            serialize_expr(l, extension_codec)?,
1339                            serialize_expr(r, extension_codec)?,
1340                        ))
1341                    })
1342                    .collect::<Result<Vec<_>, ToProtoError>>()?
1343                    .into_iter()
1344                    .unzip();
1345                let join_type: protobuf::JoinType = join_type.to_owned().into();
1346                let join_constraint: protobuf::JoinConstraint =
1347                    join_constraint.to_owned().into();
1348                let null_equality: protobuf::NullEquality =
1349                    null_equality.to_owned().into();
1350                let filter = filter
1351                    .as_ref()
1352                    .map(|e| serialize_expr(e, extension_codec))
1353                    .map_or(Ok(None), |v| v.map(Some))?;
1354                Ok(LogicalPlanNode {
1355                    logical_plan_type: Some(LogicalPlanType::Join(Box::new(
1356                        protobuf::JoinNode {
1357                            left: Some(Box::new(left)),
1358                            right: Some(Box::new(right)),
1359                            join_type: join_type.into(),
1360                            join_constraint: join_constraint.into(),
1361                            left_join_key,
1362                            right_join_key,
1363                            null_equality: null_equality.into(),
1364                            filter,
1365                        },
1366                    ))),
1367                })
1368            }
1369            LogicalPlan::Subquery(_) => {
1370                not_impl_err!("LogicalPlan serde is not yet implemented for subqueries")
1371            }
1372            LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
1373                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1374                    input.as_ref(),
1375                    extension_codec,
1376                )?;
1377                Ok(LogicalPlanNode {
1378                    logical_plan_type: Some(LogicalPlanType::SubqueryAlias(Box::new(
1379                        protobuf::SubqueryAliasNode {
1380                            input: Some(Box::new(input)),
1381                            alias: Some((*alias).clone().into()),
1382                        },
1383                    ))),
1384                })
1385            }
1386            LogicalPlan::Limit(limit) => {
1387                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1388                    limit.input.as_ref(),
1389                    extension_codec,
1390                )?;
1391                let SkipType::Literal(skip) = limit.get_skip_type()? else {
1392                    return Err(proto_error(
1393                        "LogicalPlan::Limit only supports literal skip values",
1394                    ));
1395                };
1396                let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
1397                    return Err(proto_error(
1398                        "LogicalPlan::Limit only supports literal fetch values",
1399                    ));
1400                };
1401
1402                Ok(LogicalPlanNode {
1403                    logical_plan_type: Some(LogicalPlanType::Limit(Box::new(
1404                        protobuf::LimitNode {
1405                            input: Some(Box::new(input)),
1406                            skip: skip as i64,
1407                            fetch: fetch.unwrap_or(i64::MAX as usize) as i64,
1408                        },
1409                    ))),
1410                })
1411            }
1412            LogicalPlan::Sort(Sort { input, expr, fetch }) => {
1413                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1414                    input.as_ref(),
1415                    extension_codec,
1416                )?;
1417                let sort_expr: Vec<protobuf::SortExprNode> =
1418                    serialize_sorts(expr, extension_codec)?;
1419                Ok(LogicalPlanNode {
1420                    logical_plan_type: Some(LogicalPlanType::Sort(Box::new(
1421                        protobuf::SortNode {
1422                            input: Some(Box::new(input)),
1423                            expr: sort_expr,
1424                            fetch: fetch.map(|f| f as i64).unwrap_or(-1i64),
1425                        },
1426                    ))),
1427                })
1428            }
1429            LogicalPlan::Repartition(Repartition {
1430                input,
1431                partitioning_scheme,
1432            }) => {
1433                use datafusion_expr::Partitioning;
1434                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1435                    input.as_ref(),
1436                    extension_codec,
1437                )?;
1438
1439                // Assumed common usize field was batch size
1440                // Used u64 to avoid any nastiness involving large values, most data clusters are probably uniformly 64 bits any ways
1441                use protobuf::repartition_node::PartitionMethod;
1442
1443                let pb_partition_method = match partitioning_scheme {
1444                    Partitioning::Hash(exprs, partition_count) => {
1445                        PartitionMethod::Hash(protobuf::HashRepartition {
1446                            hash_expr: serialize_exprs(exprs, extension_codec)?,
1447                            partition_count: *partition_count as u64,
1448                        })
1449                    }
1450                    Partitioning::RoundRobinBatch(partition_count) => {
1451                        PartitionMethod::RoundRobin(*partition_count as u64)
1452                    }
1453                    Partitioning::DistributeBy(_) => {
1454                        return not_impl_err!("DistributeBy")
1455                    }
1456                };
1457
1458                Ok(LogicalPlanNode {
1459                    logical_plan_type: Some(LogicalPlanType::Repartition(Box::new(
1460                        protobuf::RepartitionNode {
1461                            input: Some(Box::new(input)),
1462                            partition_method: Some(pb_partition_method),
1463                        },
1464                    ))),
1465                })
1466            }
1467            LogicalPlan::EmptyRelation(EmptyRelation {
1468                produce_one_row, ..
1469            }) => Ok(LogicalPlanNode {
1470                logical_plan_type: Some(LogicalPlanType::EmptyRelation(
1471                    protobuf::EmptyRelationNode {
1472                        produce_one_row: *produce_one_row,
1473                    },
1474                )),
1475            }),
1476            LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
1477                CreateExternalTable {
1478                    name,
1479                    location,
1480                    file_type,
1481                    schema: df_schema,
1482                    table_partition_cols,
1483                    if_not_exists,
1484                    or_replace,
1485                    definition,
1486                    order_exprs,
1487                    unbounded,
1488                    options,
1489                    constraints,
1490                    column_defaults,
1491                    temporary,
1492                },
1493            )) => {
1494                let mut converted_order_exprs: Vec<SortExprNodeCollection> = vec![];
1495                for order in order_exprs {
1496                    let temp = SortExprNodeCollection {
1497                        sort_expr_nodes: serialize_sorts(order, extension_codec)?,
1498                    };
1499                    converted_order_exprs.push(temp);
1500                }
1501
1502                let mut converted_column_defaults =
1503                    HashMap::with_capacity(column_defaults.len());
1504                for (col_name, expr) in column_defaults {
1505                    converted_column_defaults
1506                        .insert(col_name.clone(), serialize_expr(expr, extension_codec)?);
1507                }
1508
1509                Ok(LogicalPlanNode {
1510                    logical_plan_type: Some(LogicalPlanType::CreateExternalTable(
1511                        protobuf::CreateExternalTableNode {
1512                            name: Some(name.clone().into()),
1513                            location: location.clone(),
1514                            file_type: file_type.clone(),
1515                            schema: Some(df_schema.try_into()?),
1516                            table_partition_cols: table_partition_cols.clone(),
1517                            if_not_exists: *if_not_exists,
1518                            or_replace: *or_replace,
1519                            temporary: *temporary,
1520                            order_exprs: converted_order_exprs,
1521                            definition: definition.clone().unwrap_or_default(),
1522                            unbounded: *unbounded,
1523                            options: options.clone(),
1524                            constraints: Some(constraints.clone().into()),
1525                            column_defaults: converted_column_defaults,
1526                        },
1527                    )),
1528                })
1529            }
1530            LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1531                name,
1532                input,
1533                or_replace,
1534                definition,
1535                temporary,
1536            })) => Ok(LogicalPlanNode {
1537                logical_plan_type: Some(LogicalPlanType::CreateView(Box::new(
1538                    protobuf::CreateViewNode {
1539                        name: Some(name.clone().into()),
1540                        input: Some(Box::new(LogicalPlanNode::try_from_logical_plan(
1541                            input,
1542                            extension_codec,
1543                        )?)),
1544                        or_replace: *or_replace,
1545                        temporary: *temporary,
1546                        definition: definition.clone().unwrap_or_default(),
1547                    },
1548                ))),
1549            }),
1550            LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
1551                CreateCatalogSchema {
1552                    schema_name,
1553                    if_not_exists,
1554                    schema: df_schema,
1555                },
1556            )) => Ok(LogicalPlanNode {
1557                logical_plan_type: Some(LogicalPlanType::CreateCatalogSchema(
1558                    protobuf::CreateCatalogSchemaNode {
1559                        schema_name: schema_name.clone(),
1560                        if_not_exists: *if_not_exists,
1561                        schema: Some(df_schema.try_into()?),
1562                    },
1563                )),
1564            }),
1565            LogicalPlan::Ddl(DdlStatement::CreateCatalog(CreateCatalog {
1566                catalog_name,
1567                if_not_exists,
1568                schema: df_schema,
1569            })) => Ok(LogicalPlanNode {
1570                logical_plan_type: Some(LogicalPlanType::CreateCatalog(
1571                    protobuf::CreateCatalogNode {
1572                        catalog_name: catalog_name.clone(),
1573                        if_not_exists: *if_not_exists,
1574                        schema: Some(df_schema.try_into()?),
1575                    },
1576                )),
1577            }),
1578            LogicalPlan::Analyze(a) => {
1579                let input = LogicalPlanNode::try_from_logical_plan(
1580                    a.input.as_ref(),
1581                    extension_codec,
1582                )?;
1583                Ok(LogicalPlanNode {
1584                    logical_plan_type: Some(LogicalPlanType::Analyze(Box::new(
1585                        protobuf::AnalyzeNode {
1586                            input: Some(Box::new(input)),
1587                            verbose: a.verbose,
1588                        },
1589                    ))),
1590                })
1591            }
1592            LogicalPlan::Explain(a) => {
1593                let input = LogicalPlanNode::try_from_logical_plan(
1594                    a.plan.as_ref(),
1595                    extension_codec,
1596                )?;
1597                Ok(LogicalPlanNode {
1598                    logical_plan_type: Some(LogicalPlanType::Explain(Box::new(
1599                        protobuf::ExplainNode {
1600                            input: Some(Box::new(input)),
1601                            verbose: a.verbose,
1602                        },
1603                    ))),
1604                })
1605            }
1606            LogicalPlan::Union(union) => {
1607                let inputs: Vec<LogicalPlanNode> = union
1608                    .inputs
1609                    .iter()
1610                    .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
1611                    .collect::<Result<_>>()?;
1612                Ok(LogicalPlanNode {
1613                    logical_plan_type: Some(LogicalPlanType::Union(
1614                        protobuf::UnionNode { inputs },
1615                    )),
1616                })
1617            }
1618            LogicalPlan::Extension(extension) => {
1619                let mut buf: Vec<u8> = vec![];
1620                extension_codec.try_encode(extension, &mut buf)?;
1621
1622                let inputs: Vec<LogicalPlanNode> = extension
1623                    .node
1624                    .inputs()
1625                    .iter()
1626                    .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
1627                    .collect::<Result<_>>()?;
1628
1629                Ok(LogicalPlanNode {
1630                    logical_plan_type: Some(LogicalPlanType::Extension(
1631                        LogicalExtensionNode { node: buf, inputs },
1632                    )),
1633                })
1634            }
1635            LogicalPlan::Statement(Statement::Prepare(Prepare {
1636                name,
1637                fields,
1638                input,
1639            })) => {
1640                let input =
1641                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1642                Ok(LogicalPlanNode {
1643                    logical_plan_type: Some(LogicalPlanType::Prepare(Box::new(
1644                        protobuf::PrepareNode {
1645                            name: name.clone(),
1646                            input: Some(Box::new(input)),
1647                            // Store the DataTypes for reading by older DataFusion
1648                            data_types: fields
1649                                .iter()
1650                                .map(|f| f.data_type().try_into())
1651                                .collect::<Result<Vec<_>, _>>()?,
1652                            // Store the Fields for current and future DataFusion
1653                            fields: fields
1654                                .iter()
1655                                .map(|f| f.as_ref().try_into())
1656                                .collect::<Result<Vec<_>, _>>()?,
1657                        },
1658                    ))),
1659                })
1660            }
1661            LogicalPlan::Unnest(Unnest {
1662                input,
1663                exec_columns,
1664                list_type_columns,
1665                struct_type_columns,
1666                dependency_indices,
1667                schema,
1668                options,
1669            }) => {
1670                let input =
1671                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1672                let proto_unnest_list_items = list_type_columns
1673                    .iter()
1674                    .map(|(index, ul)| ColumnUnnestListItem {
1675                        input_index: *index as _,
1676                        recursion: Some(ColumnUnnestListRecursion {
1677                            output_column: Some(ul.output_column.to_owned().into()),
1678                            depth: ul.depth as _,
1679                        }),
1680                    })
1681                    .collect();
1682                Ok(LogicalPlanNode {
1683                    logical_plan_type: Some(LogicalPlanType::Unnest(Box::new(
1684                        protobuf::UnnestNode {
1685                            input: Some(Box::new(input)),
1686                            exec_columns: exec_columns
1687                                .iter()
1688                                .map(|col| col.into())
1689                                .collect(),
1690                            list_type_columns: proto_unnest_list_items,
1691                            struct_type_columns: struct_type_columns
1692                                .iter()
1693                                .map(|c| *c as u64)
1694                                .collect(),
1695                            dependency_indices: dependency_indices
1696                                .iter()
1697                                .map(|c| *c as u64)
1698                                .collect(),
1699                            schema: Some(schema.try_into()?),
1700                            options: Some(options.into()),
1701                        },
1702                    ))),
1703                })
1704            }
1705            LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(_)) => Err(proto_error(
1706                "LogicalPlan serde is not yet implemented for CreateMemoryTable",
1707            )),
1708            LogicalPlan::Ddl(DdlStatement::CreateIndex(_)) => Err(proto_error(
1709                "LogicalPlan serde is not yet implemented for CreateIndex",
1710            )),
1711            LogicalPlan::Ddl(DdlStatement::DropTable(_)) => Err(proto_error(
1712                "LogicalPlan serde is not yet implemented for DropTable",
1713            )),
1714            LogicalPlan::Ddl(DdlStatement::DropView(DropView {
1715                name,
1716                if_exists,
1717                schema,
1718            })) => Ok(LogicalPlanNode {
1719                logical_plan_type: Some(LogicalPlanType::DropView(
1720                    protobuf::DropViewNode {
1721                        name: Some(name.clone().into()),
1722                        if_exists: *if_exists,
1723                        schema: Some(schema.try_into()?),
1724                    },
1725                )),
1726            }),
1727            LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(_)) => Err(proto_error(
1728                "LogicalPlan serde is not yet implemented for DropCatalogSchema",
1729            )),
1730            LogicalPlan::Ddl(DdlStatement::CreateFunction(_)) => Err(proto_error(
1731                "LogicalPlan serde is not yet implemented for CreateFunction",
1732            )),
1733            LogicalPlan::Ddl(DdlStatement::DropFunction(_)) => Err(proto_error(
1734                "LogicalPlan serde is not yet implemented for DropFunction",
1735            )),
1736            LogicalPlan::Statement(_) => Err(proto_error(
1737                "LogicalPlan serde is not yet implemented for Statement",
1738            )),
1739            LogicalPlan::Dml(DmlStatement {
1740                table_name,
1741                target,
1742                op,
1743                input,
1744                ..
1745            }) => {
1746                let input =
1747                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1748                let dml_type: dml_node::Type = op.into();
1749                Ok(LogicalPlanNode {
1750                    logical_plan_type: Some(LogicalPlanType::Dml(Box::new(DmlNode {
1751                        input: Some(Box::new(input)),
1752                        target: Some(Box::new(from_table_source(
1753                            table_name.clone(),
1754                            Arc::clone(target),
1755                            extension_codec,
1756                        )?)),
1757                        table_name: Some(table_name.clone().into()),
1758                        dml_type: dml_type.into(),
1759                    }))),
1760                })
1761            }
1762            LogicalPlan::Copy(dml::CopyTo {
1763                input,
1764                output_url,
1765                file_type,
1766                partition_by,
1767                ..
1768            }) => {
1769                let input =
1770                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1771                let mut buf = Vec::new();
1772                extension_codec
1773                    .try_encode_file_format(&mut buf, file_type_to_format(file_type)?)?;
1774
1775                Ok(LogicalPlanNode {
1776                    logical_plan_type: Some(LogicalPlanType::CopyTo(Box::new(
1777                        protobuf::CopyToNode {
1778                            input: Some(Box::new(input)),
1779                            output_url: output_url.to_string(),
1780                            file_type: buf,
1781                            partition_by: partition_by.clone(),
1782                        },
1783                    ))),
1784                })
1785            }
1786            LogicalPlan::DescribeTable(_) => Err(proto_error(
1787                "LogicalPlan serde is not yet implemented for DescribeTable",
1788            )),
1789            LogicalPlan::RecursiveQuery(recursive) => {
1790                let static_term = LogicalPlanNode::try_from_logical_plan(
1791                    recursive.static_term.as_ref(),
1792                    extension_codec,
1793                )?;
1794                let recursive_term = LogicalPlanNode::try_from_logical_plan(
1795                    recursive.recursive_term.as_ref(),
1796                    extension_codec,
1797                )?;
1798
1799                Ok(LogicalPlanNode {
1800                    logical_plan_type: Some(LogicalPlanType::RecursiveQuery(Box::new(
1801                        protobuf::RecursiveQueryNode {
1802                            name: recursive.name.clone(),
1803                            static_term: Some(Box::new(static_term)),
1804                            recursive_term: Some(Box::new(recursive_term)),
1805                            is_distinct: recursive.is_distinct,
1806                        },
1807                    ))),
1808                })
1809            }
1810        }
1811    }
1812}