datafusion_proto/logical_plan/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::fmt::Debug;
20use std::sync::Arc;
21
22use crate::protobuf::logical_plan_node::LogicalPlanType::CustomScan;
23use crate::protobuf::{
24    ColumnUnnestListItem, ColumnUnnestListRecursion, CteWorkTableScanNode,
25    CustomTableScanNode, DmlNode, SortExprNodeCollection, dml_node,
26};
27use crate::{
28    convert_required, into_required,
29    protobuf::{
30        self, LogicalExtensionNode, LogicalPlanNode,
31        listing_table_scan_node::FileFormatType, logical_plan_node::LogicalPlanType,
32    },
33};
34
35use crate::protobuf::{ToProtoError, proto_error};
36use arrow::datatypes::{DataType, Field, Schema, SchemaBuilder, SchemaRef};
37use datafusion_catalog::cte_worktable::CteWorkTable;
38use datafusion_common::file_options::file_type::FileType;
39use datafusion_common::{
40    Result, TableReference, ToDFSchema, assert_or_internal_err, context,
41    internal_datafusion_err, internal_err, not_impl_err, plan_err,
42};
43use datafusion_datasource::file_format::FileFormat;
44use datafusion_datasource::file_format::{
45    FileFormatFactory, file_type_to_format, format_as_file_type,
46};
47use datafusion_datasource_arrow::file_format::ArrowFormat;
48#[cfg(feature = "avro")]
49use datafusion_datasource_avro::file_format::AvroFormat;
50use datafusion_datasource_csv::file_format::CsvFormat;
51use datafusion_datasource_json::file_format::JsonFormat as OtherNdJsonFormat;
52#[cfg(feature = "parquet")]
53use datafusion_datasource_parquet::file_format::ParquetFormat;
54use datafusion_expr::{
55    AggregateUDF, DmlStatement, FetchType, RecursiveQuery, SkipType, TableSource, Unnest,
56};
57use datafusion_expr::{
58    DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder, ScalarUDF, SortExpr,
59    Statement, WindowUDF, dml,
60    logical_plan::{
61        Aggregate, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateView,
62        DdlStatement, Distinct, EmptyRelation, Extension, Join, JoinConstraint, Prepare,
63        Projection, Repartition, Sort, SubqueryAlias, TableScan, Values, Window,
64        builder::project,
65    },
66};
67
68use self::to_proto::{serialize_expr, serialize_exprs};
69use crate::logical_plan::to_proto::serialize_sorts;
70use datafusion_catalog::TableProvider;
71use datafusion_catalog::default_table_source::{provider_as_source, source_as_provider};
72use datafusion_catalog::view::ViewTable;
73use datafusion_catalog_listing::{ListingOptions, ListingTable, ListingTableConfig};
74use datafusion_datasource::ListingTableUrl;
75use datafusion_execution::TaskContext;
76use prost::Message;
77use prost::bytes::BufMut;
78
79pub mod file_formats;
80pub mod from_proto;
81pub mod to_proto;
82
83pub trait AsLogicalPlan: Debug + Send + Sync + Clone {
84    fn try_decode(buf: &[u8]) -> Result<Self>
85    where
86        Self: Sized;
87
88    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
89    where
90        B: BufMut,
91        Self: Sized;
92
93    fn try_into_logical_plan(
94        &self,
95        ctx: &TaskContext,
96        extension_codec: &dyn LogicalExtensionCodec,
97    ) -> Result<LogicalPlan>;
98
99    fn try_from_logical_plan(
100        plan: &LogicalPlan,
101        extension_codec: &dyn LogicalExtensionCodec,
102    ) -> Result<Self>
103    where
104        Self: Sized;
105}
106
107pub trait LogicalExtensionCodec: Debug + Send + Sync {
108    fn try_decode(
109        &self,
110        buf: &[u8],
111        inputs: &[LogicalPlan],
112        ctx: &TaskContext,
113    ) -> Result<Extension>;
114
115    fn try_encode(&self, node: &Extension, buf: &mut Vec<u8>) -> Result<()>;
116
117    fn try_decode_table_provider(
118        &self,
119        buf: &[u8],
120        table_ref: &TableReference,
121        schema: SchemaRef,
122        ctx: &TaskContext,
123    ) -> Result<Arc<dyn TableProvider>>;
124
125    fn try_encode_table_provider(
126        &self,
127        table_ref: &TableReference,
128        node: Arc<dyn TableProvider>,
129        buf: &mut Vec<u8>,
130    ) -> Result<()>;
131
132    fn try_decode_file_format(
133        &self,
134        _buf: &[u8],
135        _ctx: &TaskContext,
136    ) -> Result<Arc<dyn FileFormatFactory>> {
137        not_impl_err!("LogicalExtensionCodec is not provided for file format")
138    }
139
140    fn try_encode_file_format(
141        &self,
142        _buf: &mut Vec<u8>,
143        _node: Arc<dyn FileFormatFactory>,
144    ) -> Result<()> {
145        Ok(())
146    }
147
148    fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
149        not_impl_err!("LogicalExtensionCodec is not provided for scalar function {name}")
150    }
151
152    fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
153        Ok(())
154    }
155
156    fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
157        not_impl_err!(
158            "LogicalExtensionCodec is not provided for aggregate function {name}"
159        )
160    }
161
162    fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
163        Ok(())
164    }
165
166    fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
167        not_impl_err!("LogicalExtensionCodec is not provided for window function {name}")
168    }
169
170    fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
171        Ok(())
172    }
173}
174
175#[derive(Debug, Clone)]
176pub struct DefaultLogicalExtensionCodec {}
177
178impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
179    fn try_decode(
180        &self,
181        _buf: &[u8],
182        _inputs: &[LogicalPlan],
183        _ctx: &TaskContext,
184    ) -> Result<Extension> {
185        not_impl_err!("LogicalExtensionCodec is not provided")
186    }
187
188    fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<()> {
189        not_impl_err!("LogicalExtensionCodec is not provided")
190    }
191
192    fn try_decode_table_provider(
193        &self,
194        _buf: &[u8],
195        _table_ref: &TableReference,
196        _schema: SchemaRef,
197        _ctx: &TaskContext,
198    ) -> Result<Arc<dyn TableProvider>> {
199        not_impl_err!("LogicalExtensionCodec is not provided")
200    }
201
202    fn try_encode_table_provider(
203        &self,
204        _table_ref: &TableReference,
205        _node: Arc<dyn TableProvider>,
206        _buf: &mut Vec<u8>,
207    ) -> Result<()> {
208        not_impl_err!("LogicalExtensionCodec is not provided")
209    }
210}
211
212#[macro_export]
213macro_rules! into_logical_plan {
214    ($PB:expr, $CTX:expr, $CODEC:expr) => {{
215        if let Some(field) = $PB.as_ref() {
216            field.as_ref().try_into_logical_plan($CTX, $CODEC)
217        } else {
218            Err(proto_error("Missing required field in protobuf"))
219        }
220    }};
221}
222
223fn from_table_reference(
224    table_ref: Option<&protobuf::TableReference>,
225    error_context: &str,
226) -> Result<TableReference> {
227    let table_ref = table_ref.ok_or_else(|| {
228        internal_datafusion_err!(
229            "Protobuf deserialization error, {error_context} was missing required field name."
230        )
231    })?;
232
233    Ok(table_ref.clone().try_into()?)
234}
235
236/// Converts [LogicalPlan::TableScan] to [TableSource]
237/// method to be used to deserialize nodes
238/// serialized by [from_table_source]
239fn to_table_source(
240    node: &Option<Box<LogicalPlanNode>>,
241    ctx: &TaskContext,
242    extension_codec: &dyn LogicalExtensionCodec,
243) -> Result<Arc<dyn TableSource>> {
244    if let Some(node) = node {
245        match node.try_into_logical_plan(ctx, extension_codec)? {
246            LogicalPlan::TableScan(TableScan { source, .. }) => Ok(source),
247            _ => plan_err!("expected TableScan node"),
248        }
249    } else {
250        plan_err!("LogicalPlanNode should be provided")
251    }
252}
253
254/// converts [TableSource] to [LogicalPlan::TableScan]
255/// using [LogicalPlan::TableScan] was the best approach to
256/// serialize [TableSource] to [LogicalPlan::TableScan]
257fn from_table_source(
258    table_name: TableReference,
259    target: Arc<dyn TableSource>,
260    extension_codec: &dyn LogicalExtensionCodec,
261) -> Result<LogicalPlanNode> {
262    let projected_schema = target.schema().to_dfschema_ref()?;
263    let r = LogicalPlan::TableScan(TableScan {
264        table_name,
265        source: target,
266        projection: None,
267        projected_schema,
268        filters: vec![],
269        fetch: None,
270    });
271
272    LogicalPlanNode::try_from_logical_plan(&r, extension_codec)
273}
274
275impl AsLogicalPlan for LogicalPlanNode {
276    fn try_decode(buf: &[u8]) -> Result<Self>
277    where
278        Self: Sized,
279    {
280        LogicalPlanNode::decode(buf)
281            .map_err(|e| internal_datafusion_err!("failed to decode logical plan: {e:?}"))
282    }
283
284    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
285    where
286        B: BufMut,
287        Self: Sized,
288    {
289        self.encode(buf)
290            .map_err(|e| internal_datafusion_err!("failed to encode logical plan: {e:?}"))
291    }
292
293    fn try_into_logical_plan(
294        &self,
295        ctx: &TaskContext,
296        extension_codec: &dyn LogicalExtensionCodec,
297    ) -> Result<LogicalPlan> {
298        let plan = self.logical_plan_type.as_ref().ok_or_else(|| {
299            proto_error(format!(
300                "logical_plan::from_proto() Unsupported logical plan '{self:?}'"
301            ))
302        })?;
303        match plan {
304            LogicalPlanType::Values(values) => {
305                let n_cols = values.n_cols as usize;
306                let values: Vec<Vec<Expr>> = if values.values_list.is_empty() {
307                    Ok(Vec::new())
308                } else if values.values_list.len() % n_cols != 0 {
309                    internal_err!(
310                        "Invalid values list length, expect {} to be divisible by {}",
311                        values.values_list.len(),
312                        n_cols
313                    )
314                } else {
315                    values
316                        .values_list
317                        .chunks_exact(n_cols)
318                        .map(|r| from_proto::parse_exprs(r, ctx, extension_codec))
319                        .collect::<Result<Vec<_>, _>>()
320                        .map_err(|e| e.into())
321                }?;
322
323                LogicalPlanBuilder::values(values)?.build()
324            }
325            LogicalPlanType::Projection(projection) => {
326                let input: LogicalPlan =
327                    into_logical_plan!(projection.input, ctx, extension_codec)?;
328                let expr: Vec<Expr> =
329                    from_proto::parse_exprs(&projection.expr, ctx, extension_codec)?;
330
331                let new_proj = project(input, expr)?;
332                match projection.optional_alias.as_ref() {
333                    Some(a) => match a {
334                        protobuf::projection_node::OptionalAlias::Alias(alias) => {
335                            Ok(LogicalPlan::SubqueryAlias(SubqueryAlias::try_new(
336                                Arc::new(new_proj),
337                                alias.clone(),
338                            )?))
339                        }
340                    },
341                    _ => Ok(new_proj),
342                }
343            }
344            LogicalPlanType::Selection(selection) => {
345                let input: LogicalPlan =
346                    into_logical_plan!(selection.input, ctx, extension_codec)?;
347                let expr: Expr = selection
348                    .expr
349                    .as_ref()
350                    .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
351                    .transpose()?
352                    .ok_or_else(|| proto_error("expression required"))?;
353                LogicalPlanBuilder::from(input).filter(expr)?.build()
354            }
355            LogicalPlanType::Window(window) => {
356                let input: LogicalPlan =
357                    into_logical_plan!(window.input, ctx, extension_codec)?;
358                let window_expr =
359                    from_proto::parse_exprs(&window.window_expr, ctx, extension_codec)?;
360                LogicalPlanBuilder::from(input).window(window_expr)?.build()
361            }
362            LogicalPlanType::Aggregate(aggregate) => {
363                let input: LogicalPlan =
364                    into_logical_plan!(aggregate.input, ctx, extension_codec)?;
365                let group_expr =
366                    from_proto::parse_exprs(&aggregate.group_expr, ctx, extension_codec)?;
367                let aggr_expr =
368                    from_proto::parse_exprs(&aggregate.aggr_expr, ctx, extension_codec)?;
369                LogicalPlanBuilder::from(input)
370                    .aggregate(group_expr, aggr_expr)?
371                    .build()
372            }
373            LogicalPlanType::ListingScan(scan) => {
374                let schema: Schema = convert_required!(scan.schema)?;
375
376                let filters =
377                    from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
378
379                let mut all_sort_orders = vec![];
380                for order in &scan.file_sort_order {
381                    all_sort_orders.push(from_proto::parse_sorts(
382                        &order.sort_expr_nodes,
383                        ctx,
384                        extension_codec,
385                    )?)
386                }
387
388                let file_format: Arc<dyn FileFormat> =
389                    match scan.file_format_type.as_ref().ok_or_else(|| {
390                        proto_error(format!(
391                            "logical_plan::from_proto() Unsupported file format '{self:?}'"
392                        ))
393                    })? {
394                        #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
395                        FileFormatType::Parquet(protobuf::ParquetFormat {options}) => {
396                            #[cfg(feature = "parquet")]
397                            {
398                                let mut parquet = ParquetFormat::default();
399                                if let Some(options) = options {
400                                    parquet = parquet.with_options(options.try_into()?)
401                                }
402                                Arc::new(parquet)
403                            }
404                            #[cfg(not(feature = "parquet"))]
405                            panic!("Unable to process parquet file since `parquet` feature is not enabled");
406                        }
407                        FileFormatType::Csv(protobuf::CsvFormat {
408                            options
409                        }) => {
410                            let mut csv = CsvFormat::default();
411                            if let Some(options) = options {
412                                csv = csv.with_options(options.try_into()?)
413                            }
414                            Arc::new(csv)
415                        },
416                        FileFormatType::Json(protobuf::NdJsonFormat {
417                            options
418                        }) => {
419                            let mut json = OtherNdJsonFormat::default();
420                            if let Some(options) = options {
421                                json = json.with_options(options.try_into()?)
422                            }
423                            Arc::new(json)
424                        }
425                        FileFormatType::Avro(..) => {
426                            #[cfg(feature = "avro")]
427                            {
428                                Arc::new(AvroFormat)
429                            }
430                            #[cfg(not(feature = "avro"))]
431                            {
432                                panic!(
433                                    "Unable to process avro file since `avro` feature is not enabled"
434                                );
435                            }
436                        }
437                        FileFormatType::Arrow(..) => {
438                            Arc::new(ArrowFormat)
439                        }
440                    };
441
442                let table_paths = &scan
443                    .paths
444                    .iter()
445                    .map(ListingTableUrl::parse)
446                    .collect::<Result<Vec<_>, _>>()?;
447
448                let partition_columns = scan
449                    .table_partition_cols
450                    .iter()
451                    .map(|col| {
452                        let Some(arrow_type) = col.arrow_type.as_ref() else {
453                            return Err(proto_error(
454                                "Missing Arrow type in partition columns",
455                            ));
456                        };
457                        let arrow_type = DataType::try_from(arrow_type).map_err(|e| {
458                            proto_error(format!("Received an unknown ArrowType: {e}"))
459                        })?;
460                        Ok((col.name.clone(), arrow_type))
461                    })
462                    .collect::<Result<Vec<_>>>()?;
463
464                let options = ListingOptions::new(file_format)
465                    .with_file_extension(&scan.file_extension)
466                    .with_table_partition_cols(partition_columns)
467                    .with_collect_stat(scan.collect_stat)
468                    .with_target_partitions(scan.target_partitions as usize)
469                    .with_file_sort_order(all_sort_orders);
470
471                let config =
472                    ListingTableConfig::new_with_multi_paths(table_paths.clone())
473                        .with_listing_options(options)
474                        .with_schema(Arc::new(schema));
475
476                let provider = ListingTable::try_new(config)?.with_cache(
477                    ctx.runtime_env().cache_manager.get_file_statistic_cache(),
478                );
479
480                let table_name =
481                    from_table_reference(scan.table_name.as_ref(), "ListingTableScan")?;
482
483                let mut projection = None;
484                if let Some(columns) = &scan.projection {
485                    let column_indices = columns
486                        .columns
487                        .iter()
488                        .map(|name| provider.schema().index_of(name))
489                        .collect::<Result<Vec<usize>, _>>()?;
490                    projection = Some(column_indices);
491                }
492
493                LogicalPlanBuilder::scan_with_filters(
494                    table_name,
495                    provider_as_source(Arc::new(provider)),
496                    projection,
497                    filters,
498                )?
499                .build()
500            }
501            LogicalPlanType::CustomScan(scan) => {
502                let schema: Schema = convert_required!(scan.schema)?;
503                let schema = Arc::new(schema);
504                let mut projection = None;
505                if let Some(columns) = &scan.projection {
506                    let column_indices = columns
507                        .columns
508                        .iter()
509                        .map(|name| schema.index_of(name))
510                        .collect::<Result<Vec<usize>, _>>()?;
511                    projection = Some(column_indices);
512                }
513
514                let filters =
515                    from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
516
517                let table_name =
518                    from_table_reference(scan.table_name.as_ref(), "CustomScan")?;
519
520                let provider = extension_codec.try_decode_table_provider(
521                    &scan.custom_table_data,
522                    &table_name,
523                    schema,
524                    ctx,
525                )?;
526
527                LogicalPlanBuilder::scan_with_filters(
528                    table_name,
529                    provider_as_source(provider),
530                    projection,
531                    filters,
532                )?
533                .build()
534            }
535            LogicalPlanType::Sort(sort) => {
536                let input: LogicalPlan =
537                    into_logical_plan!(sort.input, ctx, extension_codec)?;
538                let sort_expr: Vec<SortExpr> =
539                    from_proto::parse_sorts(&sort.expr, ctx, extension_codec)?;
540                let fetch: Option<usize> = sort.fetch.try_into().ok();
541                LogicalPlanBuilder::from(input)
542                    .sort_with_limit(sort_expr, fetch)?
543                    .build()
544            }
545            LogicalPlanType::Repartition(repartition) => {
546                use datafusion_expr::Partitioning;
547                let input: LogicalPlan =
548                    into_logical_plan!(repartition.input, ctx, extension_codec)?;
549                use protobuf::repartition_node::PartitionMethod;
550                let pb_partition_method = repartition.partition_method.as_ref().ok_or_else(|| {
551                    internal_datafusion_err!(
552                        "Protobuf deserialization error, RepartitionNode was missing required field 'partition_method'"
553                    )
554                })?;
555
556                let partitioning_scheme = match pb_partition_method {
557                    PartitionMethod::Hash(protobuf::HashRepartition {
558                        hash_expr: pb_hash_expr,
559                        partition_count,
560                    }) => Partitioning::Hash(
561                        from_proto::parse_exprs(pb_hash_expr, ctx, extension_codec)?,
562                        *partition_count as usize,
563                    ),
564                    PartitionMethod::RoundRobin(partition_count) => {
565                        Partitioning::RoundRobinBatch(*partition_count as usize)
566                    }
567                };
568
569                LogicalPlanBuilder::from(input)
570                    .repartition(partitioning_scheme)?
571                    .build()
572            }
573            LogicalPlanType::EmptyRelation(empty_relation) => {
574                LogicalPlanBuilder::empty(empty_relation.produce_one_row).build()
575            }
576            LogicalPlanType::CreateExternalTable(create_extern_table) => {
577                let pb_schema = (create_extern_table.schema.clone()).ok_or_else(|| {
578                    internal_datafusion_err!(
579                        "Protobuf deserialization error, CreateExternalTableNode was missing required field schema."
580                    )
581                })?;
582
583                let constraints = (create_extern_table.constraints.clone()).ok_or_else(|| {
584                    internal_datafusion_err!(
585                        "Protobuf deserialization error, CreateExternalTableNode was missing required table constraints."
586                    )
587                })?;
588                let definition = if !create_extern_table.definition.is_empty() {
589                    Some(create_extern_table.definition.clone())
590                } else {
591                    None
592                };
593
594                let mut order_exprs = vec![];
595                for expr in &create_extern_table.order_exprs {
596                    order_exprs.push(from_proto::parse_sorts(
597                        &expr.sort_expr_nodes,
598                        ctx,
599                        extension_codec,
600                    )?);
601                }
602
603                let mut column_defaults =
604                    HashMap::with_capacity(create_extern_table.column_defaults.len());
605                for (col_name, expr) in &create_extern_table.column_defaults {
606                    let expr = from_proto::parse_expr(expr, ctx, extension_codec)?;
607                    column_defaults.insert(col_name.clone(), expr);
608                }
609
610                Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
611                    CreateExternalTable::builder(
612                        from_table_reference(
613                            create_extern_table.name.as_ref(),
614                            "CreateExternalTable",
615                        )?,
616                        create_extern_table.location.clone(),
617                        create_extern_table.file_type.clone(),
618                        pb_schema.try_into()?,
619                    )
620                    .with_partition_cols(create_extern_table.table_partition_cols.clone())
621                    .with_order_exprs(order_exprs)
622                    .with_if_not_exists(create_extern_table.if_not_exists)
623                    .with_or_replace(create_extern_table.or_replace)
624                    .with_temporary(create_extern_table.temporary)
625                    .with_definition(definition)
626                    .with_unbounded(create_extern_table.unbounded)
627                    .with_options(create_extern_table.options.clone())
628                    .with_constraints(constraints.into())
629                    .with_column_defaults(column_defaults)
630                    .build(),
631                )))
632            }
633            LogicalPlanType::CreateView(create_view) => {
634                let plan = create_view
635                    .input.clone().ok_or_else(|| internal_datafusion_err!(
636                    "Protobuf deserialization error, CreateViewNode has invalid LogicalPlan input."
637                ))?
638                    .try_into_logical_plan(ctx, extension_codec)?;
639                let definition = if !create_view.definition.is_empty() {
640                    Some(create_view.definition.clone())
641                } else {
642                    None
643                };
644
645                Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
646                    name: from_table_reference(create_view.name.as_ref(), "CreateView")?,
647                    temporary: create_view.temporary,
648                    input: Arc::new(plan),
649                    or_replace: create_view.or_replace,
650                    definition,
651                })))
652            }
653            LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => {
654                let pb_schema = (create_catalog_schema.schema.clone()).ok_or_else(|| {
655                    internal_datafusion_err!(
656                        "Protobuf deserialization error, CreateCatalogSchemaNode was missing required field schema."
657                    )
658                })?;
659
660                Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
661                    CreateCatalogSchema {
662                        schema_name: create_catalog_schema.schema_name.clone(),
663                        if_not_exists: create_catalog_schema.if_not_exists,
664                        schema: pb_schema.try_into()?,
665                    },
666                )))
667            }
668            LogicalPlanType::CreateCatalog(create_catalog) => {
669                let pb_schema = (create_catalog.schema.clone()).ok_or_else(|| {
670                    internal_datafusion_err!(
671                        "Protobuf deserialization error, CreateCatalogNode was missing required field schema."
672                    )
673                })?;
674
675                Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
676                    CreateCatalog {
677                        catalog_name: create_catalog.catalog_name.clone(),
678                        if_not_exists: create_catalog.if_not_exists,
679                        schema: pb_schema.try_into()?,
680                    },
681                )))
682            }
683            LogicalPlanType::Analyze(analyze) => {
684                let input: LogicalPlan =
685                    into_logical_plan!(analyze.input, ctx, extension_codec)?;
686                LogicalPlanBuilder::from(input)
687                    .explain(analyze.verbose, true)?
688                    .build()
689            }
690            LogicalPlanType::Explain(explain) => {
691                let input: LogicalPlan =
692                    into_logical_plan!(explain.input, ctx, extension_codec)?;
693                LogicalPlanBuilder::from(input)
694                    .explain(explain.verbose, false)?
695                    .build()
696            }
697            LogicalPlanType::SubqueryAlias(aliased_relation) => {
698                let input: LogicalPlan =
699                    into_logical_plan!(aliased_relation.input, ctx, extension_codec)?;
700                let alias = from_table_reference(
701                    aliased_relation.alias.as_ref(),
702                    "SubqueryAlias",
703                )?;
704                LogicalPlanBuilder::from(input).alias(alias)?.build()
705            }
706            LogicalPlanType::Limit(limit) => {
707                let input: LogicalPlan =
708                    into_logical_plan!(limit.input, ctx, extension_codec)?;
709                let skip = limit.skip.max(0) as usize;
710
711                let fetch = if limit.fetch < 0 {
712                    None
713                } else {
714                    Some(limit.fetch as usize)
715                };
716
717                LogicalPlanBuilder::from(input).limit(skip, fetch)?.build()
718            }
719            LogicalPlanType::Join(join) => {
720                let left_keys: Vec<Expr> =
721                    from_proto::parse_exprs(&join.left_join_key, ctx, extension_codec)?;
722                let right_keys: Vec<Expr> =
723                    from_proto::parse_exprs(&join.right_join_key, ctx, extension_codec)?;
724                let join_type =
725                    protobuf::JoinType::try_from(join.join_type).map_err(|_| {
726                        proto_error(format!(
727                            "Received a JoinNode message with unknown JoinType {}",
728                            join.join_type
729                        ))
730                    })?;
731                let join_constraint = protobuf::JoinConstraint::try_from(
732                    join.join_constraint,
733                )
734                .map_err(|_| {
735                    proto_error(format!(
736                        "Received a JoinNode message with unknown JoinConstraint {}",
737                        join.join_constraint
738                    ))
739                })?;
740                let filter: Option<Expr> = join
741                    .filter
742                    .as_ref()
743                    .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
744                    .map_or(Ok(None), |v| v.map(Some))?;
745
746                let builder = LogicalPlanBuilder::from(into_logical_plan!(
747                    join.left,
748                    ctx,
749                    extension_codec
750                )?);
751                let builder = match join_constraint.into() {
752                    JoinConstraint::On => builder.join_with_expr_keys(
753                        into_logical_plan!(join.right, ctx, extension_codec)?,
754                        join_type.into(),
755                        (left_keys, right_keys),
756                        filter,
757                    )?,
758                    JoinConstraint::Using => {
759                        // The equijoin keys in using-join must be column.
760                        let using_keys = left_keys
761                            .into_iter()
762                            .map(|key| {
763                                key.try_as_col().cloned()
764                                    .ok_or_else(|| internal_datafusion_err!(
765                                        "Using join keys must be column references, got: {key:?}"
766                                    ))
767                            })
768                            .collect::<Result<Vec<_>, _>>()?;
769                        builder.join_using(
770                            into_logical_plan!(join.right, ctx, extension_codec)?,
771                            join_type.into(),
772                            using_keys,
773                        )?
774                    }
775                };
776
777                builder.build()
778            }
779            LogicalPlanType::Union(union) => {
780                assert_or_internal_err!(
781                    union.inputs.len() >= 2,
782                    "Protobuf deserialization error, Union requires at least two inputs."
783                );
784                let (first, rest) = union.inputs.split_first().unwrap();
785                let mut builder = LogicalPlanBuilder::from(
786                    first.try_into_logical_plan(ctx, extension_codec)?,
787                );
788
789                for i in rest {
790                    let plan = i.try_into_logical_plan(ctx, extension_codec)?;
791                    builder = builder.union(plan)?;
792                }
793                builder.build()
794            }
795            LogicalPlanType::CrossJoin(crossjoin) => {
796                let left = into_logical_plan!(crossjoin.left, ctx, extension_codec)?;
797                let right = into_logical_plan!(crossjoin.right, ctx, extension_codec)?;
798
799                LogicalPlanBuilder::from(left).cross_join(right)?.build()
800            }
801            LogicalPlanType::Extension(LogicalExtensionNode { node, inputs }) => {
802                let input_plans: Vec<LogicalPlan> = inputs
803                    .iter()
804                    .map(|i| i.try_into_logical_plan(ctx, extension_codec))
805                    .collect::<Result<_>>()?;
806
807                let extension_node =
808                    extension_codec.try_decode(node, &input_plans, ctx)?;
809                Ok(LogicalPlan::Extension(extension_node))
810            }
811            LogicalPlanType::Distinct(distinct) => {
812                let input: LogicalPlan =
813                    into_logical_plan!(distinct.input, ctx, extension_codec)?;
814                LogicalPlanBuilder::from(input).distinct()?.build()
815            }
816            LogicalPlanType::DistinctOn(distinct_on) => {
817                let input: LogicalPlan =
818                    into_logical_plan!(distinct_on.input, ctx, extension_codec)?;
819                let on_expr =
820                    from_proto::parse_exprs(&distinct_on.on_expr, ctx, extension_codec)?;
821                let select_expr = from_proto::parse_exprs(
822                    &distinct_on.select_expr,
823                    ctx,
824                    extension_codec,
825                )?;
826                let sort_expr = match distinct_on.sort_expr.len() {
827                    0 => None,
828                    _ => Some(from_proto::parse_sorts(
829                        &distinct_on.sort_expr,
830                        ctx,
831                        extension_codec,
832                    )?),
833                };
834                LogicalPlanBuilder::from(input)
835                    .distinct_on(on_expr, select_expr, sort_expr)?
836                    .build()
837            }
838            LogicalPlanType::ViewScan(scan) => {
839                let schema: Schema = convert_required!(scan.schema)?;
840
841                let mut projection = None;
842                if let Some(columns) = &scan.projection {
843                    let column_indices = columns
844                        .columns
845                        .iter()
846                        .map(|name| schema.index_of(name))
847                        .collect::<Result<Vec<usize>, _>>()?;
848                    projection = Some(column_indices);
849                }
850
851                let input: LogicalPlan =
852                    into_logical_plan!(scan.input, ctx, extension_codec)?;
853
854                let definition = if !scan.definition.is_empty() {
855                    Some(scan.definition.clone())
856                } else {
857                    None
858                };
859
860                let provider = ViewTable::new(input, definition);
861
862                let table_name =
863                    from_table_reference(scan.table_name.as_ref(), "ViewScan")?;
864
865                LogicalPlanBuilder::scan(
866                    table_name,
867                    provider_as_source(Arc::new(provider)),
868                    projection,
869                )?
870                .build()
871            }
872            LogicalPlanType::Prepare(prepare) => {
873                let input: LogicalPlan =
874                    into_logical_plan!(prepare.input, ctx, extension_codec)?;
875                let data_types: Vec<DataType> = prepare
876                    .data_types
877                    .iter()
878                    .map(DataType::try_from)
879                    .collect::<Result<_, _>>()?;
880                let fields: Vec<Field> = prepare
881                    .fields
882                    .iter()
883                    .map(Field::try_from)
884                    .collect::<Result<_, _>>()?;
885
886                // If the fields are empty this may have been generated by an
887                // earlier version of DataFusion, in which case the DataTypes
888                // can be used to construct the plan.
889                if fields.is_empty() {
890                    LogicalPlanBuilder::from(input)
891                        .prepare(
892                            prepare.name.clone(),
893                            data_types
894                                .into_iter()
895                                .map(|dt| Field::new("", dt, true).into())
896                                .collect(),
897                        )?
898                        .build()
899                } else {
900                    LogicalPlanBuilder::from(input)
901                        .prepare(
902                            prepare.name.clone(),
903                            fields.into_iter().map(|f| f.into()).collect(),
904                        )?
905                        .build()
906                }
907            }
908            LogicalPlanType::DropView(dropview) => {
909                Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
910                    name: from_table_reference(dropview.name.as_ref(), "DropView")?,
911                    if_exists: dropview.if_exists,
912                    schema: Arc::new(convert_required!(dropview.schema)?),
913                })))
914            }
915            LogicalPlanType::CopyTo(copy) => {
916                let input: LogicalPlan =
917                    into_logical_plan!(copy.input, ctx, extension_codec)?;
918
919                let file_type: Arc<dyn FileType> = format_as_file_type(
920                    extension_codec.try_decode_file_format(&copy.file_type, ctx)?,
921                );
922
923                Ok(LogicalPlan::Copy(dml::CopyTo::new(
924                    Arc::new(input),
925                    copy.output_url.clone(),
926                    copy.partition_by.clone(),
927                    file_type,
928                    Default::default(),
929                )))
930            }
931            LogicalPlanType::Unnest(unnest) => {
932                let input: LogicalPlan =
933                    into_logical_plan!(unnest.input, ctx, extension_codec)?;
934
935                LogicalPlanBuilder::from(input)
936                    .unnest_columns_with_options(
937                        unnest.exec_columns.iter().map(|c| c.into()).collect(),
938                        into_required!(unnest.options)?,
939                    )?
940                    .build()
941            }
942            LogicalPlanType::RecursiveQuery(recursive_query_node) => {
943                let static_term = recursive_query_node
944                    .static_term
945                    .as_ref()
946                    .ok_or_else(|| internal_datafusion_err!(
947                        "Protobuf deserialization error, RecursiveQueryNode was missing required field static_term."
948                    ))?
949                    .try_into_logical_plan(ctx, extension_codec)?;
950
951                let recursive_term = recursive_query_node
952                    .recursive_term
953                    .as_ref()
954                    .ok_or_else(|| internal_datafusion_err!(
955                        "Protobuf deserialization error, RecursiveQueryNode was missing required field recursive_term."
956                    ))?
957                    .try_into_logical_plan(ctx, extension_codec)?;
958
959                Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
960                    name: recursive_query_node.name.clone(),
961                    static_term: Arc::new(static_term),
962                    recursive_term: Arc::new(recursive_term),
963                    is_distinct: recursive_query_node.is_distinct,
964                }))
965            }
966            LogicalPlanType::CteWorkTableScan(cte_work_table_scan_node) => {
967                let CteWorkTableScanNode { name, schema } = cte_work_table_scan_node;
968                let schema = convert_required!(*schema)?;
969                let cte_work_table = CteWorkTable::new(name.as_str(), Arc::new(schema));
970                LogicalPlanBuilder::scan(
971                    name.as_str(),
972                    provider_as_source(Arc::new(cte_work_table)),
973                    None,
974                )?
975                .build()
976            }
977            LogicalPlanType::Dml(dml_node) => {
978                Ok(LogicalPlan::Dml(datafusion_expr::DmlStatement::new(
979                    from_table_reference(dml_node.table_name.as_ref(), "DML ")?,
980                    to_table_source(&dml_node.target, ctx, extension_codec)?,
981                    dml_node.dml_type().into(),
982                    Arc::new(into_logical_plan!(dml_node.input, ctx, extension_codec)?),
983                )))
984            }
985        }
986    }
987
988    fn try_from_logical_plan(
989        plan: &LogicalPlan,
990        extension_codec: &dyn LogicalExtensionCodec,
991    ) -> Result<Self>
992    where
993        Self: Sized,
994    {
995        match plan {
996            LogicalPlan::Values(Values { values, .. }) => {
997                let n_cols = if values.is_empty() {
998                    0
999                } else {
1000                    values[0].len()
1001                } as u64;
1002                let values_list =
1003                    serialize_exprs(values.iter().flatten(), extension_codec)?;
1004                Ok(LogicalPlanNode {
1005                    logical_plan_type: Some(LogicalPlanType::Values(
1006                        protobuf::ValuesNode {
1007                            n_cols,
1008                            values_list,
1009                        },
1010                    )),
1011                })
1012            }
1013            LogicalPlan::TableScan(TableScan {
1014                table_name,
1015                source,
1016                filters,
1017                projection,
1018                ..
1019            }) => {
1020                let provider = source_as_provider(source)?;
1021                let schema = provider.schema();
1022                let source = provider.as_any();
1023
1024                let projection = match projection {
1025                    None => None,
1026                    Some(columns) => {
1027                        let column_names = columns
1028                            .iter()
1029                            .map(|i| schema.field(*i).name().to_owned())
1030                            .collect();
1031                        Some(protobuf::ProjectionColumns {
1032                            columns: column_names,
1033                        })
1034                    }
1035                };
1036
1037                let filters: Vec<protobuf::LogicalExprNode> =
1038                    serialize_exprs(filters, extension_codec)?;
1039
1040                if let Some(listing_table) = source.downcast_ref::<ListingTable>() {
1041                    let any = listing_table.options().format.as_any();
1042                    let file_format_type = {
1043                        let mut maybe_some_type = None;
1044
1045                        #[cfg(feature = "parquet")]
1046                        if let Some(parquet) = any.downcast_ref::<ParquetFormat>() {
1047                            let options = parquet.options();
1048                            maybe_some_type =
1049                                Some(FileFormatType::Parquet(protobuf::ParquetFormat {
1050                                    options: Some(options.try_into()?),
1051                                }));
1052                        };
1053
1054                        if let Some(csv) = any.downcast_ref::<CsvFormat>() {
1055                            let options = csv.options();
1056                            maybe_some_type =
1057                                Some(FileFormatType::Csv(protobuf::CsvFormat {
1058                                    options: Some(options.try_into()?),
1059                                }));
1060                        }
1061
1062                        if let Some(json) = any.downcast_ref::<OtherNdJsonFormat>() {
1063                            let options = json.options();
1064                            maybe_some_type =
1065                                Some(FileFormatType::Json(protobuf::NdJsonFormat {
1066                                    options: Some(options.try_into()?),
1067                                }))
1068                        }
1069
1070                        #[cfg(feature = "avro")]
1071                        if any.is::<AvroFormat>() {
1072                            maybe_some_type =
1073                                Some(FileFormatType::Avro(protobuf::AvroFormat {}))
1074                        }
1075
1076                        if any.is::<ArrowFormat>() {
1077                            maybe_some_type =
1078                                Some(FileFormatType::Arrow(protobuf::ArrowFormat {}))
1079                        }
1080
1081                        if let Some(file_format_type) = maybe_some_type {
1082                            file_format_type
1083                        } else {
1084                            return Err(proto_error(format!(
1085                                "Error deserializing unknown file format: {:?}",
1086                                listing_table.options().format
1087                            )));
1088                        }
1089                    };
1090
1091                    let options = listing_table.options();
1092
1093                    let mut builder = SchemaBuilder::from(schema.as_ref());
1094                    for (idx, field) in schema.fields().iter().enumerate().rev() {
1095                        if options
1096                            .table_partition_cols
1097                            .iter()
1098                            .any(|(name, _)| name == field.name())
1099                        {
1100                            builder.remove(idx);
1101                        }
1102                    }
1103
1104                    let schema = builder.finish();
1105
1106                    let schema: protobuf::Schema = (&schema).try_into()?;
1107
1108                    let mut exprs_vec: Vec<SortExprNodeCollection> = vec![];
1109                    for order in &options.file_sort_order {
1110                        let expr_vec = SortExprNodeCollection {
1111                            sort_expr_nodes: serialize_sorts(order, extension_codec)?,
1112                        };
1113                        exprs_vec.push(expr_vec);
1114                    }
1115
1116                    let partition_columns = options
1117                        .table_partition_cols
1118                        .iter()
1119                        .map(|(name, arrow_type)| {
1120                            let arrow_type = protobuf::ArrowType::try_from(arrow_type)
1121                                .map_err(|e| {
1122                                    proto_error(format!(
1123                                        "Received an unknown ArrowType: {e}"
1124                                    ))
1125                                })?;
1126                            Ok(protobuf::PartitionColumn {
1127                                name: name.clone(),
1128                                arrow_type: Some(arrow_type),
1129                            })
1130                        })
1131                        .collect::<Result<Vec<_>>>()?;
1132
1133                    Ok(LogicalPlanNode {
1134                        logical_plan_type: Some(LogicalPlanType::ListingScan(
1135                            protobuf::ListingTableScanNode {
1136                                file_format_type: Some(file_format_type),
1137                                table_name: Some(table_name.clone().into()),
1138                                collect_stat: options.collect_stat,
1139                                file_extension: options.file_extension.clone(),
1140                                table_partition_cols: partition_columns,
1141                                paths: listing_table
1142                                    .table_paths()
1143                                    .iter()
1144                                    .map(|x| x.to_string())
1145                                    .collect(),
1146                                schema: Some(schema),
1147                                projection,
1148                                filters,
1149                                target_partitions: options.target_partitions as u32,
1150                                file_sort_order: exprs_vec,
1151                            },
1152                        )),
1153                    })
1154                } else if let Some(view_table) = source.downcast_ref::<ViewTable>() {
1155                    let schema: protobuf::Schema = schema.as_ref().try_into()?;
1156                    Ok(LogicalPlanNode {
1157                        logical_plan_type: Some(LogicalPlanType::ViewScan(Box::new(
1158                            protobuf::ViewTableScanNode {
1159                                table_name: Some(table_name.clone().into()),
1160                                input: Some(Box::new(
1161                                    LogicalPlanNode::try_from_logical_plan(
1162                                        view_table.logical_plan(),
1163                                        extension_codec,
1164                                    )?,
1165                                )),
1166                                schema: Some(schema),
1167                                projection,
1168                                definition: view_table
1169                                    .definition()
1170                                    .map(|s| s.to_string())
1171                                    .unwrap_or_default(),
1172                            },
1173                        ))),
1174                    })
1175                } else if let Some(cte_work_table) = source.downcast_ref::<CteWorkTable>()
1176                {
1177                    let name = cte_work_table.name().to_string();
1178                    let schema = cte_work_table.schema();
1179                    let schema: protobuf::Schema = schema.as_ref().try_into()?;
1180
1181                    Ok(LogicalPlanNode {
1182                        logical_plan_type: Some(LogicalPlanType::CteWorkTableScan(
1183                            protobuf::CteWorkTableScanNode {
1184                                name,
1185                                schema: Some(schema),
1186                            },
1187                        )),
1188                    })
1189                } else {
1190                    let schema: protobuf::Schema = schema.as_ref().try_into()?;
1191                    let mut bytes = vec![];
1192                    extension_codec
1193                        .try_encode_table_provider(table_name, provider, &mut bytes)
1194                        .map_err(|e| context!("Error serializing custom table", e))?;
1195                    let scan = CustomScan(CustomTableScanNode {
1196                        table_name: Some(table_name.clone().into()),
1197                        projection,
1198                        schema: Some(schema),
1199                        filters,
1200                        custom_table_data: bytes,
1201                    });
1202                    let node = LogicalPlanNode {
1203                        logical_plan_type: Some(scan),
1204                    };
1205                    Ok(node)
1206                }
1207            }
1208            LogicalPlan::Projection(Projection { expr, input, .. }) => {
1209                Ok(LogicalPlanNode {
1210                    logical_plan_type: Some(LogicalPlanType::Projection(Box::new(
1211                        protobuf::ProjectionNode {
1212                            input: Some(Box::new(
1213                                LogicalPlanNode::try_from_logical_plan(
1214                                    input.as_ref(),
1215                                    extension_codec,
1216                                )?,
1217                            )),
1218                            expr: serialize_exprs(expr, extension_codec)?,
1219                            optional_alias: None,
1220                        },
1221                    ))),
1222                })
1223            }
1224            LogicalPlan::Filter(filter) => {
1225                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1226                    filter.input.as_ref(),
1227                    extension_codec,
1228                )?;
1229                Ok(LogicalPlanNode {
1230                    logical_plan_type: Some(LogicalPlanType::Selection(Box::new(
1231                        protobuf::SelectionNode {
1232                            input: Some(Box::new(input)),
1233                            expr: Some(serialize_expr(
1234                                &filter.predicate,
1235                                extension_codec,
1236                            )?),
1237                        },
1238                    ))),
1239                })
1240            }
1241            LogicalPlan::Distinct(Distinct::All(input)) => {
1242                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1243                    input.as_ref(),
1244                    extension_codec,
1245                )?;
1246                Ok(LogicalPlanNode {
1247                    logical_plan_type: Some(LogicalPlanType::Distinct(Box::new(
1248                        protobuf::DistinctNode {
1249                            input: Some(Box::new(input)),
1250                        },
1251                    ))),
1252                })
1253            }
1254            LogicalPlan::Distinct(Distinct::On(DistinctOn {
1255                on_expr,
1256                select_expr,
1257                sort_expr,
1258                input,
1259                ..
1260            })) => {
1261                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1262                    input.as_ref(),
1263                    extension_codec,
1264                )?;
1265                let sort_expr = match sort_expr {
1266                    None => vec![],
1267                    Some(sort_expr) => serialize_sorts(sort_expr, extension_codec)?,
1268                };
1269                Ok(LogicalPlanNode {
1270                    logical_plan_type: Some(LogicalPlanType::DistinctOn(Box::new(
1271                        protobuf::DistinctOnNode {
1272                            on_expr: serialize_exprs(on_expr, extension_codec)?,
1273                            select_expr: serialize_exprs(select_expr, extension_codec)?,
1274                            sort_expr,
1275                            input: Some(Box::new(input)),
1276                        },
1277                    ))),
1278                })
1279            }
1280            LogicalPlan::Window(Window {
1281                input, window_expr, ..
1282            }) => {
1283                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1284                    input.as_ref(),
1285                    extension_codec,
1286                )?;
1287                Ok(LogicalPlanNode {
1288                    logical_plan_type: Some(LogicalPlanType::Window(Box::new(
1289                        protobuf::WindowNode {
1290                            input: Some(Box::new(input)),
1291                            window_expr: serialize_exprs(window_expr, extension_codec)?,
1292                        },
1293                    ))),
1294                })
1295            }
1296            LogicalPlan::Aggregate(Aggregate {
1297                group_expr,
1298                aggr_expr,
1299                input,
1300                ..
1301            }) => {
1302                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1303                    input.as_ref(),
1304                    extension_codec,
1305                )?;
1306                Ok(LogicalPlanNode {
1307                    logical_plan_type: Some(LogicalPlanType::Aggregate(Box::new(
1308                        protobuf::AggregateNode {
1309                            input: Some(Box::new(input)),
1310                            group_expr: serialize_exprs(group_expr, extension_codec)?,
1311                            aggr_expr: serialize_exprs(aggr_expr, extension_codec)?,
1312                        },
1313                    ))),
1314                })
1315            }
1316            LogicalPlan::Join(Join {
1317                left,
1318                right,
1319                on,
1320                filter,
1321                join_type,
1322                join_constraint,
1323                null_equality,
1324                ..
1325            }) => {
1326                let left: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1327                    left.as_ref(),
1328                    extension_codec,
1329                )?;
1330                let right: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1331                    right.as_ref(),
1332                    extension_codec,
1333                )?;
1334                let (left_join_key, right_join_key) = on
1335                    .iter()
1336                    .map(|(l, r)| {
1337                        Ok((
1338                            serialize_expr(l, extension_codec)?,
1339                            serialize_expr(r, extension_codec)?,
1340                        ))
1341                    })
1342                    .collect::<Result<Vec<_>, ToProtoError>>()?
1343                    .into_iter()
1344                    .unzip();
1345                let join_type: protobuf::JoinType = join_type.to_owned().into();
1346                let join_constraint: protobuf::JoinConstraint =
1347                    join_constraint.to_owned().into();
1348                let null_equality: protobuf::NullEquality =
1349                    null_equality.to_owned().into();
1350                let filter = filter
1351                    .as_ref()
1352                    .map(|e| serialize_expr(e, extension_codec))
1353                    .map_or(Ok(None), |v| v.map(Some))?;
1354                Ok(LogicalPlanNode {
1355                    logical_plan_type: Some(LogicalPlanType::Join(Box::new(
1356                        protobuf::JoinNode {
1357                            left: Some(Box::new(left)),
1358                            right: Some(Box::new(right)),
1359                            join_type: join_type.into(),
1360                            join_constraint: join_constraint.into(),
1361                            left_join_key,
1362                            right_join_key,
1363                            null_equality: null_equality.into(),
1364                            filter,
1365                        },
1366                    ))),
1367                })
1368            }
1369            LogicalPlan::Subquery(_) => {
1370                not_impl_err!("LogicalPlan serde is not yet implemented for subqueries")
1371            }
1372            LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
1373                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1374                    input.as_ref(),
1375                    extension_codec,
1376                )?;
1377                Ok(LogicalPlanNode {
1378                    logical_plan_type: Some(LogicalPlanType::SubqueryAlias(Box::new(
1379                        protobuf::SubqueryAliasNode {
1380                            input: Some(Box::new(input)),
1381                            alias: Some((*alias).clone().into()),
1382                        },
1383                    ))),
1384                })
1385            }
1386            LogicalPlan::Limit(limit) => {
1387                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1388                    limit.input.as_ref(),
1389                    extension_codec,
1390                )?;
1391                let SkipType::Literal(skip) = limit.get_skip_type()? else {
1392                    return Err(proto_error(
1393                        "LogicalPlan::Limit only supports literal skip values",
1394                    ));
1395                };
1396                let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
1397                    return Err(proto_error(
1398                        "LogicalPlan::Limit only supports literal fetch values",
1399                    ));
1400                };
1401
1402                Ok(LogicalPlanNode {
1403                    logical_plan_type: Some(LogicalPlanType::Limit(Box::new(
1404                        protobuf::LimitNode {
1405                            input: Some(Box::new(input)),
1406                            skip: skip as i64,
1407                            fetch: fetch.unwrap_or(i64::MAX as usize) as i64,
1408                        },
1409                    ))),
1410                })
1411            }
1412            LogicalPlan::Sort(Sort { input, expr, fetch }) => {
1413                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1414                    input.as_ref(),
1415                    extension_codec,
1416                )?;
1417                let sort_expr: Vec<protobuf::SortExprNode> =
1418                    serialize_sorts(expr, extension_codec)?;
1419                Ok(LogicalPlanNode {
1420                    logical_plan_type: Some(LogicalPlanType::Sort(Box::new(
1421                        protobuf::SortNode {
1422                            input: Some(Box::new(input)),
1423                            expr: sort_expr,
1424                            fetch: fetch.map(|f| f as i64).unwrap_or(-1i64),
1425                        },
1426                    ))),
1427                })
1428            }
1429            LogicalPlan::Repartition(Repartition {
1430                input,
1431                partitioning_scheme,
1432            }) => {
1433                use datafusion_expr::Partitioning;
1434                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1435                    input.as_ref(),
1436                    extension_codec,
1437                )?;
1438
1439                // Assumed common usize field was batch size
1440                // Used u64 to avoid any nastiness involving large values, most data clusters are probably uniformly 64 bits any ways
1441                use protobuf::repartition_node::PartitionMethod;
1442
1443                let pb_partition_method = match partitioning_scheme {
1444                    Partitioning::Hash(exprs, partition_count) => {
1445                        PartitionMethod::Hash(protobuf::HashRepartition {
1446                            hash_expr: serialize_exprs(exprs, extension_codec)?,
1447                            partition_count: *partition_count as u64,
1448                        })
1449                    }
1450                    Partitioning::RoundRobinBatch(partition_count) => {
1451                        PartitionMethod::RoundRobin(*partition_count as u64)
1452                    }
1453                    Partitioning::DistributeBy(_) => {
1454                        return not_impl_err!("DistributeBy");
1455                    }
1456                };
1457
1458                Ok(LogicalPlanNode {
1459                    logical_plan_type: Some(LogicalPlanType::Repartition(Box::new(
1460                        protobuf::RepartitionNode {
1461                            input: Some(Box::new(input)),
1462                            partition_method: Some(pb_partition_method),
1463                        },
1464                    ))),
1465                })
1466            }
1467            LogicalPlan::EmptyRelation(EmptyRelation {
1468                produce_one_row, ..
1469            }) => Ok(LogicalPlanNode {
1470                logical_plan_type: Some(LogicalPlanType::EmptyRelation(
1471                    protobuf::EmptyRelationNode {
1472                        produce_one_row: *produce_one_row,
1473                    },
1474                )),
1475            }),
1476            LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
1477                CreateExternalTable {
1478                    name,
1479                    location,
1480                    file_type,
1481                    schema: df_schema,
1482                    table_partition_cols,
1483                    if_not_exists,
1484                    or_replace,
1485                    definition,
1486                    order_exprs,
1487                    unbounded,
1488                    options,
1489                    constraints,
1490                    column_defaults,
1491                    temporary,
1492                },
1493            )) => {
1494                let mut converted_order_exprs: Vec<SortExprNodeCollection> = vec![];
1495                for order in order_exprs {
1496                    let temp = SortExprNodeCollection {
1497                        sort_expr_nodes: serialize_sorts(order, extension_codec)?,
1498                    };
1499                    converted_order_exprs.push(temp);
1500                }
1501
1502                let mut converted_column_defaults =
1503                    HashMap::with_capacity(column_defaults.len());
1504                for (col_name, expr) in column_defaults {
1505                    converted_column_defaults
1506                        .insert(col_name.clone(), serialize_expr(expr, extension_codec)?);
1507                }
1508
1509                Ok(LogicalPlanNode {
1510                    logical_plan_type: Some(LogicalPlanType::CreateExternalTable(
1511                        protobuf::CreateExternalTableNode {
1512                            name: Some(name.clone().into()),
1513                            location: location.clone(),
1514                            file_type: file_type.clone(),
1515                            schema: Some(df_schema.try_into()?),
1516                            table_partition_cols: table_partition_cols.clone(),
1517                            if_not_exists: *if_not_exists,
1518                            or_replace: *or_replace,
1519                            temporary: *temporary,
1520                            order_exprs: converted_order_exprs,
1521                            definition: definition.clone().unwrap_or_default(),
1522                            unbounded: *unbounded,
1523                            options: options.clone(),
1524                            constraints: Some(constraints.clone().into()),
1525                            column_defaults: converted_column_defaults,
1526                        },
1527                    )),
1528                })
1529            }
1530            LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1531                name,
1532                input,
1533                or_replace,
1534                definition,
1535                temporary,
1536            })) => Ok(LogicalPlanNode {
1537                logical_plan_type: Some(LogicalPlanType::CreateView(Box::new(
1538                    protobuf::CreateViewNode {
1539                        name: Some(name.clone().into()),
1540                        input: Some(Box::new(LogicalPlanNode::try_from_logical_plan(
1541                            input,
1542                            extension_codec,
1543                        )?)),
1544                        or_replace: *or_replace,
1545                        temporary: *temporary,
1546                        definition: definition.clone().unwrap_or_default(),
1547                    },
1548                ))),
1549            }),
1550            LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
1551                CreateCatalogSchema {
1552                    schema_name,
1553                    if_not_exists,
1554                    schema: df_schema,
1555                },
1556            )) => Ok(LogicalPlanNode {
1557                logical_plan_type: Some(LogicalPlanType::CreateCatalogSchema(
1558                    protobuf::CreateCatalogSchemaNode {
1559                        schema_name: schema_name.clone(),
1560                        if_not_exists: *if_not_exists,
1561                        schema: Some(df_schema.try_into()?),
1562                    },
1563                )),
1564            }),
1565            LogicalPlan::Ddl(DdlStatement::CreateCatalog(CreateCatalog {
1566                catalog_name,
1567                if_not_exists,
1568                schema: df_schema,
1569            })) => Ok(LogicalPlanNode {
1570                logical_plan_type: Some(LogicalPlanType::CreateCatalog(
1571                    protobuf::CreateCatalogNode {
1572                        catalog_name: catalog_name.clone(),
1573                        if_not_exists: *if_not_exists,
1574                        schema: Some(df_schema.try_into()?),
1575                    },
1576                )),
1577            }),
1578            LogicalPlan::Analyze(a) => {
1579                let input = LogicalPlanNode::try_from_logical_plan(
1580                    a.input.as_ref(),
1581                    extension_codec,
1582                )?;
1583                Ok(LogicalPlanNode {
1584                    logical_plan_type: Some(LogicalPlanType::Analyze(Box::new(
1585                        protobuf::AnalyzeNode {
1586                            input: Some(Box::new(input)),
1587                            verbose: a.verbose,
1588                        },
1589                    ))),
1590                })
1591            }
1592            LogicalPlan::Explain(a) => {
1593                let input = LogicalPlanNode::try_from_logical_plan(
1594                    a.plan.as_ref(),
1595                    extension_codec,
1596                )?;
1597                Ok(LogicalPlanNode {
1598                    logical_plan_type: Some(LogicalPlanType::Explain(Box::new(
1599                        protobuf::ExplainNode {
1600                            input: Some(Box::new(input)),
1601                            verbose: a.verbose,
1602                        },
1603                    ))),
1604                })
1605            }
1606            LogicalPlan::Union(union) => {
1607                let inputs: Vec<LogicalPlanNode> = union
1608                    .inputs
1609                    .iter()
1610                    .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
1611                    .collect::<Result<_>>()?;
1612                Ok(LogicalPlanNode {
1613                    logical_plan_type: Some(LogicalPlanType::Union(
1614                        protobuf::UnionNode { inputs },
1615                    )),
1616                })
1617            }
1618            LogicalPlan::Extension(extension) => {
1619                let mut buf: Vec<u8> = vec![];
1620                extension_codec.try_encode(extension, &mut buf)?;
1621
1622                let inputs: Vec<LogicalPlanNode> = extension
1623                    .node
1624                    .inputs()
1625                    .iter()
1626                    .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
1627                    .collect::<Result<_>>()?;
1628
1629                Ok(LogicalPlanNode {
1630                    logical_plan_type: Some(LogicalPlanType::Extension(
1631                        LogicalExtensionNode { node: buf, inputs },
1632                    )),
1633                })
1634            }
1635            LogicalPlan::Statement(Statement::Prepare(Prepare {
1636                name,
1637                fields,
1638                input,
1639            })) => {
1640                let input =
1641                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1642                Ok(LogicalPlanNode {
1643                    logical_plan_type: Some(LogicalPlanType::Prepare(Box::new(
1644                        protobuf::PrepareNode {
1645                            name: name.clone(),
1646                            input: Some(Box::new(input)),
1647                            // Store the DataTypes for reading by older DataFusion
1648                            data_types: fields
1649                                .iter()
1650                                .map(|f| f.data_type().try_into())
1651                                .collect::<Result<Vec<_>, _>>()?,
1652                            // Store the Fields for current and future DataFusion
1653                            fields: fields
1654                                .iter()
1655                                .map(|f| f.as_ref().try_into())
1656                                .collect::<Result<Vec<_>, _>>()?,
1657                        },
1658                    ))),
1659                })
1660            }
1661            LogicalPlan::Unnest(Unnest {
1662                input,
1663                exec_columns,
1664                list_type_columns,
1665                struct_type_columns,
1666                dependency_indices,
1667                schema,
1668                options,
1669            }) => {
1670                let input =
1671                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1672                let proto_unnest_list_items = list_type_columns
1673                    .iter()
1674                    .map(|(index, ul)| ColumnUnnestListItem {
1675                        input_index: *index as _,
1676                        recursion: Some(ColumnUnnestListRecursion {
1677                            output_column: Some(ul.output_column.to_owned().into()),
1678                            depth: ul.depth as _,
1679                        }),
1680                    })
1681                    .collect();
1682                Ok(LogicalPlanNode {
1683                    logical_plan_type: Some(LogicalPlanType::Unnest(Box::new(
1684                        protobuf::UnnestNode {
1685                            input: Some(Box::new(input)),
1686                            exec_columns: exec_columns
1687                                .iter()
1688                                .map(|col| col.into())
1689                                .collect(),
1690                            list_type_columns: proto_unnest_list_items,
1691                            struct_type_columns: struct_type_columns
1692                                .iter()
1693                                .map(|c| *c as u64)
1694                                .collect(),
1695                            dependency_indices: dependency_indices
1696                                .iter()
1697                                .map(|c| *c as u64)
1698                                .collect(),
1699                            schema: Some(schema.try_into()?),
1700                            options: Some(options.into()),
1701                        },
1702                    ))),
1703                })
1704            }
1705            LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(_)) => Err(proto_error(
1706                "LogicalPlan serde is not yet implemented for CreateMemoryTable",
1707            )),
1708            LogicalPlan::Ddl(DdlStatement::CreateIndex(_)) => Err(proto_error(
1709                "LogicalPlan serde is not yet implemented for CreateIndex",
1710            )),
1711            LogicalPlan::Ddl(DdlStatement::DropTable(_)) => Err(proto_error(
1712                "LogicalPlan serde is not yet implemented for DropTable",
1713            )),
1714            LogicalPlan::Ddl(DdlStatement::DropView(DropView {
1715                name,
1716                if_exists,
1717                schema,
1718            })) => Ok(LogicalPlanNode {
1719                logical_plan_type: Some(LogicalPlanType::DropView(
1720                    protobuf::DropViewNode {
1721                        name: Some(name.clone().into()),
1722                        if_exists: *if_exists,
1723                        schema: Some(schema.try_into()?),
1724                    },
1725                )),
1726            }),
1727            LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(_)) => Err(proto_error(
1728                "LogicalPlan serde is not yet implemented for DropCatalogSchema",
1729            )),
1730            LogicalPlan::Ddl(DdlStatement::CreateFunction(_)) => Err(proto_error(
1731                "LogicalPlan serde is not yet implemented for CreateFunction",
1732            )),
1733            LogicalPlan::Ddl(DdlStatement::DropFunction(_)) => Err(proto_error(
1734                "LogicalPlan serde is not yet implemented for DropFunction",
1735            )),
1736            LogicalPlan::Statement(_) => Err(proto_error(
1737                "LogicalPlan serde is not yet implemented for Statement",
1738            )),
1739            LogicalPlan::Dml(DmlStatement {
1740                table_name,
1741                target,
1742                op,
1743                input,
1744                ..
1745            }) => {
1746                let input =
1747                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1748                let dml_type: dml_node::Type = op.into();
1749                Ok(LogicalPlanNode {
1750                    logical_plan_type: Some(LogicalPlanType::Dml(Box::new(DmlNode {
1751                        input: Some(Box::new(input)),
1752                        target: Some(Box::new(from_table_source(
1753                            table_name.clone(),
1754                            Arc::clone(target),
1755                            extension_codec,
1756                        )?)),
1757                        table_name: Some(table_name.clone().into()),
1758                        dml_type: dml_type.into(),
1759                    }))),
1760                })
1761            }
1762            LogicalPlan::Copy(dml::CopyTo {
1763                input,
1764                output_url,
1765                file_type,
1766                partition_by,
1767                ..
1768            }) => {
1769                let input =
1770                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1771                let mut buf = Vec::new();
1772                extension_codec
1773                    .try_encode_file_format(&mut buf, file_type_to_format(file_type)?)?;
1774
1775                Ok(LogicalPlanNode {
1776                    logical_plan_type: Some(LogicalPlanType::CopyTo(Box::new(
1777                        protobuf::CopyToNode {
1778                            input: Some(Box::new(input)),
1779                            output_url: output_url.to_string(),
1780                            file_type: buf,
1781                            partition_by: partition_by.clone(),
1782                        },
1783                    ))),
1784                })
1785            }
1786            LogicalPlan::DescribeTable(_) => Err(proto_error(
1787                "LogicalPlan serde is not yet implemented for DescribeTable",
1788            )),
1789            LogicalPlan::RecursiveQuery(recursive) => {
1790                let static_term = LogicalPlanNode::try_from_logical_plan(
1791                    recursive.static_term.as_ref(),
1792                    extension_codec,
1793                )?;
1794                let recursive_term = LogicalPlanNode::try_from_logical_plan(
1795                    recursive.recursive_term.as_ref(),
1796                    extension_codec,
1797                )?;
1798
1799                Ok(LogicalPlanNode {
1800                    logical_plan_type: Some(LogicalPlanType::RecursiveQuery(Box::new(
1801                        protobuf::RecursiveQueryNode {
1802                            name: recursive.name.clone(),
1803                            static_term: Some(Box::new(static_term)),
1804                            recursive_term: Some(Box::new(recursive_term)),
1805                            is_distinct: recursive.is_distinct,
1806                        },
1807                    ))),
1808                })
1809            }
1810        }
1811    }
1812}