datafusion_proto/logical_plan/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::fmt::Debug;
20use std::sync::Arc;
21
22use crate::protobuf::logical_plan_node::LogicalPlanType::CustomScan;
23use crate::protobuf::{
24    dml_node, ColumnUnnestListItem, ColumnUnnestListRecursion, CteWorkTableScanNode,
25    CustomTableScanNode, DmlNode, SortExprNodeCollection,
26};
27use crate::{
28    convert_required, into_required,
29    protobuf::{
30        self, listing_table_scan_node::FileFormatType,
31        logical_plan_node::LogicalPlanType, LogicalExtensionNode, LogicalPlanNode,
32    },
33};
34
35use crate::protobuf::{proto_error, ToProtoError};
36use arrow::datatypes::{DataType, Schema, SchemaBuilder, SchemaRef};
37use datafusion::datasource::cte_worktable::CteWorkTable;
38use datafusion::datasource::file_format::arrow::ArrowFormat;
39#[cfg(feature = "avro")]
40use datafusion::datasource::file_format::avro::AvroFormat;
41#[cfg(feature = "parquet")]
42use datafusion::datasource::file_format::parquet::ParquetFormat;
43use datafusion::datasource::file_format::{
44    file_type_to_format, format_as_file_type, FileFormatFactory,
45};
46use datafusion::{
47    datasource::{
48        file_format::{
49            csv::CsvFormat, json::JsonFormat as OtherNdJsonFormat, FileFormat,
50        },
51        listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
52        view::ViewTable,
53        TableProvider,
54    },
55    datasource::{provider_as_source, source_as_provider},
56    prelude::SessionContext,
57};
58use datafusion_common::file_options::file_type::FileType;
59use datafusion_common::{
60    context, internal_datafusion_err, internal_err, not_impl_err, plan_err,
61    DataFusionError, Result, TableReference, ToDFSchema,
62};
63use datafusion_expr::{
64    dml,
65    logical_plan::{
66        builder::project, Aggregate, CreateCatalog, CreateCatalogSchema,
67        CreateExternalTable, CreateView, DdlStatement, Distinct, EmptyRelation,
68        Extension, Join, JoinConstraint, Prepare, Projection, Repartition, Sort,
69        SubqueryAlias, TableScan, Values, Window,
70    },
71    DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder, ScalarUDF, SortExpr,
72    Statement, WindowUDF,
73};
74use datafusion_expr::{
75    AggregateUDF, DmlStatement, FetchType, RecursiveQuery, SkipType, TableSource, Unnest,
76};
77
78use self::to_proto::{serialize_expr, serialize_exprs};
79use crate::logical_plan::to_proto::serialize_sorts;
80use prost::bytes::BufMut;
81use prost::Message;
82
83pub mod file_formats;
84pub mod from_proto;
85pub mod to_proto;
86
87pub trait AsLogicalPlan: Debug + Send + Sync + Clone {
88    fn try_decode(buf: &[u8]) -> Result<Self>
89    where
90        Self: Sized;
91
92    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
93    where
94        B: BufMut,
95        Self: Sized;
96
97    fn try_into_logical_plan(
98        &self,
99        ctx: &SessionContext,
100        extension_codec: &dyn LogicalExtensionCodec,
101    ) -> Result<LogicalPlan>;
102
103    fn try_from_logical_plan(
104        plan: &LogicalPlan,
105        extension_codec: &dyn LogicalExtensionCodec,
106    ) -> Result<Self>
107    where
108        Self: Sized;
109}
110
111pub trait LogicalExtensionCodec: Debug + Send + Sync {
112    fn try_decode(
113        &self,
114        buf: &[u8],
115        inputs: &[LogicalPlan],
116        ctx: &SessionContext,
117    ) -> Result<Extension>;
118
119    fn try_encode(&self, node: &Extension, buf: &mut Vec<u8>) -> Result<()>;
120
121    fn try_decode_table_provider(
122        &self,
123        buf: &[u8],
124        table_ref: &TableReference,
125        schema: SchemaRef,
126        ctx: &SessionContext,
127    ) -> Result<Arc<dyn TableProvider>>;
128
129    fn try_encode_table_provider(
130        &self,
131        table_ref: &TableReference,
132        node: Arc<dyn TableProvider>,
133        buf: &mut Vec<u8>,
134    ) -> Result<()>;
135
136    fn try_decode_file_format(
137        &self,
138        _buf: &[u8],
139        _ctx: &SessionContext,
140    ) -> Result<Arc<dyn FileFormatFactory>> {
141        not_impl_err!("LogicalExtensionCodec is not provided for file format")
142    }
143
144    fn try_encode_file_format(
145        &self,
146        _buf: &mut Vec<u8>,
147        _node: Arc<dyn FileFormatFactory>,
148    ) -> Result<()> {
149        Ok(())
150    }
151
152    fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
153        not_impl_err!("LogicalExtensionCodec is not provided for scalar function {name}")
154    }
155
156    fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
157        Ok(())
158    }
159
160    fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
161        not_impl_err!(
162            "LogicalExtensionCodec is not provided for aggregate function {name}"
163        )
164    }
165
166    fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
167        Ok(())
168    }
169
170    fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
171        not_impl_err!("LogicalExtensionCodec is not provided for window function {name}")
172    }
173
174    fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
175        Ok(())
176    }
177}
178
179#[derive(Debug, Clone)]
180pub struct DefaultLogicalExtensionCodec {}
181
182impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
183    fn try_decode(
184        &self,
185        _buf: &[u8],
186        _inputs: &[LogicalPlan],
187        _ctx: &SessionContext,
188    ) -> Result<Extension> {
189        not_impl_err!("LogicalExtensionCodec is not provided")
190    }
191
192    fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<()> {
193        not_impl_err!("LogicalExtensionCodec is not provided")
194    }
195
196    fn try_decode_table_provider(
197        &self,
198        _buf: &[u8],
199        _table_ref: &TableReference,
200        _schema: SchemaRef,
201        _ctx: &SessionContext,
202    ) -> Result<Arc<dyn TableProvider>> {
203        not_impl_err!("LogicalExtensionCodec is not provided")
204    }
205
206    fn try_encode_table_provider(
207        &self,
208        _table_ref: &TableReference,
209        _node: Arc<dyn TableProvider>,
210        _buf: &mut Vec<u8>,
211    ) -> Result<()> {
212        not_impl_err!("LogicalExtensionCodec is not provided")
213    }
214}
215
216#[macro_export]
217macro_rules! into_logical_plan {
218    ($PB:expr, $CTX:expr, $CODEC:expr) => {{
219        if let Some(field) = $PB.as_ref() {
220            field.as_ref().try_into_logical_plan($CTX, $CODEC)
221        } else {
222            Err(proto_error("Missing required field in protobuf"))
223        }
224    }};
225}
226
227fn from_table_reference(
228    table_ref: Option<&protobuf::TableReference>,
229    error_context: &str,
230) -> Result<TableReference> {
231    let table_ref = table_ref.ok_or_else(|| {
232        DataFusionError::Internal(format!(
233            "Protobuf deserialization error, {error_context} was missing required field name."
234        ))
235    })?;
236
237    Ok(table_ref.clone().try_into()?)
238}
239
240/// Converts [LogicalPlan::TableScan] to [TableSource]
241/// method to be used to deserialize nodes
242/// serialized by [from_table_source]
243fn to_table_source(
244    node: &Option<Box<LogicalPlanNode>>,
245    ctx: &SessionContext,
246    extension_codec: &dyn LogicalExtensionCodec,
247) -> Result<Arc<dyn TableSource>> {
248    if let Some(node) = node {
249        match node.try_into_logical_plan(ctx, extension_codec)? {
250            LogicalPlan::TableScan(TableScan { source, .. }) => Ok(source),
251            _ => plan_err!("expected TableScan node"),
252        }
253    } else {
254        plan_err!("LogicalPlanNode should be provided")
255    }
256}
257
258/// converts [TableSource] to [LogicalPlan::TableScan]
259/// using [LogicalPlan::TableScan] was the best approach to
260/// serialize [TableSource] to [LogicalPlan::TableScan]
261fn from_table_source(
262    table_name: TableReference,
263    target: Arc<dyn TableSource>,
264    extension_codec: &dyn LogicalExtensionCodec,
265) -> Result<LogicalPlanNode> {
266    let projected_schema = target.schema().to_dfschema_ref()?;
267    let r = LogicalPlan::TableScan(TableScan {
268        table_name,
269        source: target,
270        projection: None,
271        projected_schema,
272        filters: vec![],
273        fetch: None,
274    });
275
276    LogicalPlanNode::try_from_logical_plan(&r, extension_codec)
277}
278
279impl AsLogicalPlan for LogicalPlanNode {
280    fn try_decode(buf: &[u8]) -> Result<Self>
281    where
282        Self: Sized,
283    {
284        LogicalPlanNode::decode(buf).map_err(|e| {
285            DataFusionError::Internal(format!("failed to decode logical plan: {e:?}"))
286        })
287    }
288
289    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
290    where
291        B: BufMut,
292        Self: Sized,
293    {
294        self.encode(buf).map_err(|e| {
295            DataFusionError::Internal(format!("failed to encode logical plan: {e:?}"))
296        })
297    }
298
299    fn try_into_logical_plan(
300        &self,
301        ctx: &SessionContext,
302        extension_codec: &dyn LogicalExtensionCodec,
303    ) -> Result<LogicalPlan> {
304        let plan = self.logical_plan_type.as_ref().ok_or_else(|| {
305            proto_error(format!(
306                "logical_plan::from_proto() Unsupported logical plan '{self:?}'"
307            ))
308        })?;
309        match plan {
310            LogicalPlanType::Values(values) => {
311                let n_cols = values.n_cols as usize;
312                let values: Vec<Vec<Expr>> = if values.values_list.is_empty() {
313                    Ok(Vec::new())
314                } else if values.values_list.len() % n_cols != 0 {
315                    internal_err!(
316                        "Invalid values list length, expect {} to be divisible by {}",
317                        values.values_list.len(),
318                        n_cols
319                    )
320                } else {
321                    values
322                        .values_list
323                        .chunks_exact(n_cols)
324                        .map(|r| from_proto::parse_exprs(r, ctx, extension_codec))
325                        .collect::<Result<Vec<_>, _>>()
326                        .map_err(|e| e.into())
327                }?;
328
329                LogicalPlanBuilder::values(values)?.build()
330            }
331            LogicalPlanType::Projection(projection) => {
332                let input: LogicalPlan =
333                    into_logical_plan!(projection.input, ctx, extension_codec)?;
334                let expr: Vec<Expr> =
335                    from_proto::parse_exprs(&projection.expr, ctx, extension_codec)?;
336
337                let new_proj = project(input, expr)?;
338                match projection.optional_alias.as_ref() {
339                    Some(a) => match a {
340                        protobuf::projection_node::OptionalAlias::Alias(alias) => {
341                            Ok(LogicalPlan::SubqueryAlias(SubqueryAlias::try_new(
342                                Arc::new(new_proj),
343                                alias.clone(),
344                            )?))
345                        }
346                    },
347                    _ => Ok(new_proj),
348                }
349            }
350            LogicalPlanType::Selection(selection) => {
351                let input: LogicalPlan =
352                    into_logical_plan!(selection.input, ctx, extension_codec)?;
353                let expr: Expr = selection
354                    .expr
355                    .as_ref()
356                    .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
357                    .transpose()?
358                    .ok_or_else(|| proto_error("expression required"))?;
359                LogicalPlanBuilder::from(input).filter(expr)?.build()
360            }
361            LogicalPlanType::Window(window) => {
362                let input: LogicalPlan =
363                    into_logical_plan!(window.input, ctx, extension_codec)?;
364                let window_expr =
365                    from_proto::parse_exprs(&window.window_expr, ctx, extension_codec)?;
366                LogicalPlanBuilder::from(input).window(window_expr)?.build()
367            }
368            LogicalPlanType::Aggregate(aggregate) => {
369                let input: LogicalPlan =
370                    into_logical_plan!(aggregate.input, ctx, extension_codec)?;
371                let group_expr =
372                    from_proto::parse_exprs(&aggregate.group_expr, ctx, extension_codec)?;
373                let aggr_expr =
374                    from_proto::parse_exprs(&aggregate.aggr_expr, ctx, extension_codec)?;
375                LogicalPlanBuilder::from(input)
376                    .aggregate(group_expr, aggr_expr)?
377                    .build()
378            }
379            LogicalPlanType::ListingScan(scan) => {
380                let schema: Schema = convert_required!(scan.schema)?;
381
382                let mut projection = None;
383                if let Some(columns) = &scan.projection {
384                    let column_indices = columns
385                        .columns
386                        .iter()
387                        .map(|name| schema.index_of(name))
388                        .collect::<Result<Vec<usize>, _>>()?;
389                    projection = Some(column_indices);
390                }
391
392                let filters =
393                    from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
394
395                let mut all_sort_orders = vec![];
396                for order in &scan.file_sort_order {
397                    all_sort_orders.push(from_proto::parse_sorts(
398                        &order.sort_expr_nodes,
399                        ctx,
400                        extension_codec,
401                    )?)
402                }
403
404                let file_format: Arc<dyn FileFormat> =
405                    match scan.file_format_type.as_ref().ok_or_else(|| {
406                        proto_error(format!(
407                            "logical_plan::from_proto() Unsupported file format '{self:?}'"
408                        ))
409                    })? {
410                        #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
411                        FileFormatType::Parquet(protobuf::ParquetFormat {options}) => {
412                            #[cfg(feature = "parquet")]
413                            {
414                                let mut parquet = ParquetFormat::default();
415                                if let Some(options) = options {
416                                    parquet = parquet.with_options(options.try_into()?)
417                                }
418                                Arc::new(parquet)
419                            }
420                            #[cfg(not(feature = "parquet"))]
421                            panic!("Unable to process parquet file since `parquet` feature is not enabled");
422                        }
423                        FileFormatType::Csv(protobuf::CsvFormat {
424                            options
425                        }) => {
426                            let mut csv = CsvFormat::default();
427                            if let Some(options) = options {
428                                csv = csv.with_options(options.try_into()?)
429                            }
430                            Arc::new(csv)
431                        },
432                        FileFormatType::Json(protobuf::NdJsonFormat {
433                            options
434                        }) => {
435                            let mut json = OtherNdJsonFormat::default();
436                            if let Some(options) = options {
437                                json = json.with_options(options.try_into()?)
438                            }
439                            Arc::new(json)
440                        }
441                        #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
442                        FileFormatType::Avro(..) => {
443                            #[cfg(feature = "avro")]
444                            {
445                                Arc::new(AvroFormat)
446                            }
447                            #[cfg(not(feature = "avro"))]
448                            panic!("Unable to process avro file since `avro` feature is not enabled");
449                        }
450                        FileFormatType::Arrow(..) => {
451                            Arc::new(ArrowFormat)
452                        }
453                    };
454
455                let table_paths = &scan
456                    .paths
457                    .iter()
458                    .map(ListingTableUrl::parse)
459                    .collect::<Result<Vec<_>, _>>()?;
460
461                let partition_columns = scan
462                    .table_partition_cols
463                    .iter()
464                    .map(|col| {
465                        let Some(arrow_type) = col.arrow_type.as_ref() else {
466                            return Err(proto_error(
467                                "Missing Arrow type in partition columns",
468                            ));
469                        };
470                        let arrow_type = DataType::try_from(arrow_type).map_err(|e| {
471                            proto_error(format!("Received an unknown ArrowType: {e}"))
472                        })?;
473                        Ok((col.name.clone(), arrow_type))
474                    })
475                    .collect::<Result<Vec<_>>>()?;
476
477                let options = ListingOptions::new(file_format)
478                    .with_file_extension(&scan.file_extension)
479                    .with_table_partition_cols(partition_columns)
480                    .with_collect_stat(scan.collect_stat)
481                    .with_target_partitions(scan.target_partitions as usize)
482                    .with_file_sort_order(all_sort_orders);
483
484                let config =
485                    ListingTableConfig::new_with_multi_paths(table_paths.clone())
486                        .with_listing_options(options)
487                        .with_schema(Arc::new(schema));
488
489                let provider = ListingTable::try_new(config)?.with_cache(
490                    ctx.state()
491                        .runtime_env()
492                        .cache_manager
493                        .get_file_statistic_cache(),
494                );
495
496                let table_name =
497                    from_table_reference(scan.table_name.as_ref(), "ListingTableScan")?;
498
499                LogicalPlanBuilder::scan_with_filters(
500                    table_name,
501                    provider_as_source(Arc::new(provider)),
502                    projection,
503                    filters,
504                )?
505                .build()
506            }
507            LogicalPlanType::CustomScan(scan) => {
508                let schema: Schema = convert_required!(scan.schema)?;
509                let schema = Arc::new(schema);
510                let mut projection = None;
511                if let Some(columns) = &scan.projection {
512                    let column_indices = columns
513                        .columns
514                        .iter()
515                        .map(|name| schema.index_of(name))
516                        .collect::<Result<Vec<usize>, _>>()?;
517                    projection = Some(column_indices);
518                }
519
520                let filters =
521                    from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
522
523                let table_name =
524                    from_table_reference(scan.table_name.as_ref(), "CustomScan")?;
525
526                let provider = extension_codec.try_decode_table_provider(
527                    &scan.custom_table_data,
528                    &table_name,
529                    schema,
530                    ctx,
531                )?;
532
533                LogicalPlanBuilder::scan_with_filters(
534                    table_name,
535                    provider_as_source(provider),
536                    projection,
537                    filters,
538                )?
539                .build()
540            }
541            LogicalPlanType::Sort(sort) => {
542                let input: LogicalPlan =
543                    into_logical_plan!(sort.input, ctx, extension_codec)?;
544                let sort_expr: Vec<SortExpr> =
545                    from_proto::parse_sorts(&sort.expr, ctx, extension_codec)?;
546                let fetch: Option<usize> = sort.fetch.try_into().ok();
547                LogicalPlanBuilder::from(input)
548                    .sort_with_limit(sort_expr, fetch)?
549                    .build()
550            }
551            LogicalPlanType::Repartition(repartition) => {
552                use datafusion::logical_expr::Partitioning;
553                let input: LogicalPlan =
554                    into_logical_plan!(repartition.input, ctx, extension_codec)?;
555                use protobuf::repartition_node::PartitionMethod;
556                let pb_partition_method = repartition.partition_method.as_ref().ok_or_else(|| {
557                    internal_datafusion_err!(
558                        "Protobuf deserialization error, RepartitionNode was missing required field 'partition_method'"
559                    )
560                })?;
561
562                let partitioning_scheme = match pb_partition_method {
563                    PartitionMethod::Hash(protobuf::HashRepartition {
564                        hash_expr: pb_hash_expr,
565                        partition_count,
566                    }) => Partitioning::Hash(
567                        from_proto::parse_exprs(pb_hash_expr, ctx, extension_codec)?,
568                        *partition_count as usize,
569                    ),
570                    PartitionMethod::RoundRobin(partition_count) => {
571                        Partitioning::RoundRobinBatch(*partition_count as usize)
572                    }
573                };
574
575                LogicalPlanBuilder::from(input)
576                    .repartition(partitioning_scheme)?
577                    .build()
578            }
579            LogicalPlanType::EmptyRelation(empty_relation) => {
580                LogicalPlanBuilder::empty(empty_relation.produce_one_row).build()
581            }
582            LogicalPlanType::CreateExternalTable(create_extern_table) => {
583                let pb_schema = (create_extern_table.schema.clone()).ok_or_else(|| {
584                    DataFusionError::Internal(String::from(
585                        "Protobuf deserialization error, CreateExternalTableNode was missing required field schema."
586                    ))
587                })?;
588
589                let constraints = (create_extern_table.constraints.clone()).ok_or_else(|| {
590                    DataFusionError::Internal(String::from(
591                        "Protobuf deserialization error, CreateExternalTableNode was missing required table constraints.",
592                    ))
593                })?;
594                let definition = if !create_extern_table.definition.is_empty() {
595                    Some(create_extern_table.definition.clone())
596                } else {
597                    None
598                };
599
600                let file_type = create_extern_table.file_type.as_str();
601                if ctx.table_factory(file_type).is_none() {
602                    internal_err!("No TableProviderFactory for file type: {file_type}")?
603                }
604
605                let mut order_exprs = vec![];
606                for expr in &create_extern_table.order_exprs {
607                    order_exprs.push(from_proto::parse_sorts(
608                        &expr.sort_expr_nodes,
609                        ctx,
610                        extension_codec,
611                    )?);
612                }
613
614                let mut column_defaults =
615                    HashMap::with_capacity(create_extern_table.column_defaults.len());
616                for (col_name, expr) in &create_extern_table.column_defaults {
617                    let expr = from_proto::parse_expr(expr, ctx, extension_codec)?;
618                    column_defaults.insert(col_name.clone(), expr);
619                }
620
621                Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
622                    CreateExternalTable {
623                        schema: pb_schema.try_into()?,
624                        name: from_table_reference(
625                            create_extern_table.name.as_ref(),
626                            "CreateExternalTable",
627                        )?,
628                        location: create_extern_table.location.clone(),
629                        file_type: create_extern_table.file_type.clone(),
630                        table_partition_cols: create_extern_table
631                            .table_partition_cols
632                            .clone(),
633                        order_exprs,
634                        if_not_exists: create_extern_table.if_not_exists,
635                        temporary: create_extern_table.temporary,
636                        definition,
637                        unbounded: create_extern_table.unbounded,
638                        options: create_extern_table.options.clone(),
639                        constraints: constraints.into(),
640                        column_defaults,
641                    },
642                )))
643            }
644            LogicalPlanType::CreateView(create_view) => {
645                let plan = create_view
646                    .input.clone().ok_or_else(|| DataFusionError::Internal(String::from(
647                    "Protobuf deserialization error, CreateViewNode has invalid LogicalPlan input.",
648                )))?
649                    .try_into_logical_plan(ctx, extension_codec)?;
650                let definition = if !create_view.definition.is_empty() {
651                    Some(create_view.definition.clone())
652                } else {
653                    None
654                };
655
656                Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
657                    name: from_table_reference(create_view.name.as_ref(), "CreateView")?,
658                    temporary: create_view.temporary,
659                    input: Arc::new(plan),
660                    or_replace: create_view.or_replace,
661                    definition,
662                })))
663            }
664            LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => {
665                let pb_schema = (create_catalog_schema.schema.clone()).ok_or_else(|| {
666                    DataFusionError::Internal(String::from(
667                        "Protobuf deserialization error, CreateCatalogSchemaNode was missing required field schema.",
668                    ))
669                })?;
670
671                Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
672                    CreateCatalogSchema {
673                        schema_name: create_catalog_schema.schema_name.clone(),
674                        if_not_exists: create_catalog_schema.if_not_exists,
675                        schema: pb_schema.try_into()?,
676                    },
677                )))
678            }
679            LogicalPlanType::CreateCatalog(create_catalog) => {
680                let pb_schema = (create_catalog.schema.clone()).ok_or_else(|| {
681                    DataFusionError::Internal(String::from(
682                        "Protobuf deserialization error, CreateCatalogNode was missing required field schema.",
683                    ))
684                })?;
685
686                Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
687                    CreateCatalog {
688                        catalog_name: create_catalog.catalog_name.clone(),
689                        if_not_exists: create_catalog.if_not_exists,
690                        schema: pb_schema.try_into()?,
691                    },
692                )))
693            }
694            LogicalPlanType::Analyze(analyze) => {
695                let input: LogicalPlan =
696                    into_logical_plan!(analyze.input, ctx, extension_codec)?;
697                LogicalPlanBuilder::from(input)
698                    .explain(analyze.verbose, true)?
699                    .build()
700            }
701            LogicalPlanType::Explain(explain) => {
702                let input: LogicalPlan =
703                    into_logical_plan!(explain.input, ctx, extension_codec)?;
704                LogicalPlanBuilder::from(input)
705                    .explain(explain.verbose, false)?
706                    .build()
707            }
708            LogicalPlanType::SubqueryAlias(aliased_relation) => {
709                let input: LogicalPlan =
710                    into_logical_plan!(aliased_relation.input, ctx, extension_codec)?;
711                let alias = from_table_reference(
712                    aliased_relation.alias.as_ref(),
713                    "SubqueryAlias",
714                )?;
715                LogicalPlanBuilder::from(input).alias(alias)?.build()
716            }
717            LogicalPlanType::Limit(limit) => {
718                let input: LogicalPlan =
719                    into_logical_plan!(limit.input, ctx, extension_codec)?;
720                let skip = limit.skip.max(0) as usize;
721
722                let fetch = if limit.fetch < 0 {
723                    None
724                } else {
725                    Some(limit.fetch as usize)
726                };
727
728                LogicalPlanBuilder::from(input).limit(skip, fetch)?.build()
729            }
730            LogicalPlanType::Join(join) => {
731                let left_keys: Vec<Expr> =
732                    from_proto::parse_exprs(&join.left_join_key, ctx, extension_codec)?;
733                let right_keys: Vec<Expr> =
734                    from_proto::parse_exprs(&join.right_join_key, ctx, extension_codec)?;
735                let join_type =
736                    protobuf::JoinType::try_from(join.join_type).map_err(|_| {
737                        proto_error(format!(
738                            "Received a JoinNode message with unknown JoinType {}",
739                            join.join_type
740                        ))
741                    })?;
742                let join_constraint = protobuf::JoinConstraint::try_from(
743                    join.join_constraint,
744                )
745                .map_err(|_| {
746                    proto_error(format!(
747                        "Received a JoinNode message with unknown JoinConstraint {}",
748                        join.join_constraint
749                    ))
750                })?;
751                let filter: Option<Expr> = join
752                    .filter
753                    .as_ref()
754                    .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
755                    .map_or(Ok(None), |v| v.map(Some))?;
756
757                let builder = LogicalPlanBuilder::from(into_logical_plan!(
758                    join.left,
759                    ctx,
760                    extension_codec
761                )?);
762                let builder = match join_constraint.into() {
763                    JoinConstraint::On => builder.join_with_expr_keys(
764                        into_logical_plan!(join.right, ctx, extension_codec)?,
765                        join_type.into(),
766                        (left_keys, right_keys),
767                        filter,
768                    )?,
769                    JoinConstraint::Using => {
770                        // The equijoin keys in using-join must be column.
771                        let using_keys = left_keys
772                            .into_iter()
773                            .map(|key| {
774                                key.try_as_col().cloned()
775                                    .ok_or_else(|| internal_datafusion_err!(
776                                        "Using join keys must be column references, got: {key:?}"
777                                    ))
778                            })
779                            .collect::<Result<Vec<_>, _>>()?;
780                        builder.join_using(
781                            into_logical_plan!(join.right, ctx, extension_codec)?,
782                            join_type.into(),
783                            using_keys,
784                        )?
785                    }
786                };
787
788                builder.build()
789            }
790            LogicalPlanType::Union(union) => {
791                if union.inputs.len() < 2 {
792                    return  Err( DataFusionError::Internal(String::from(
793                        "Protobuf deserialization error, Union was require at least two input.",
794                    )));
795                }
796                let (first, rest) = union.inputs.split_first().unwrap();
797                let mut builder = LogicalPlanBuilder::from(
798                    first.try_into_logical_plan(ctx, extension_codec)?,
799                );
800
801                for i in rest {
802                    let plan = i.try_into_logical_plan(ctx, extension_codec)?;
803                    builder = builder.union(plan)?;
804                }
805                builder.build()
806            }
807            LogicalPlanType::CrossJoin(crossjoin) => {
808                let left = into_logical_plan!(crossjoin.left, ctx, extension_codec)?;
809                let right = into_logical_plan!(crossjoin.right, ctx, extension_codec)?;
810
811                LogicalPlanBuilder::from(left).cross_join(right)?.build()
812            }
813            LogicalPlanType::Extension(LogicalExtensionNode { node, inputs }) => {
814                let input_plans: Vec<LogicalPlan> = inputs
815                    .iter()
816                    .map(|i| i.try_into_logical_plan(ctx, extension_codec))
817                    .collect::<Result<_>>()?;
818
819                let extension_node =
820                    extension_codec.try_decode(node, &input_plans, ctx)?;
821                Ok(LogicalPlan::Extension(extension_node))
822            }
823            LogicalPlanType::Distinct(distinct) => {
824                let input: LogicalPlan =
825                    into_logical_plan!(distinct.input, ctx, extension_codec)?;
826                LogicalPlanBuilder::from(input).distinct()?.build()
827            }
828            LogicalPlanType::DistinctOn(distinct_on) => {
829                let input: LogicalPlan =
830                    into_logical_plan!(distinct_on.input, ctx, extension_codec)?;
831                let on_expr =
832                    from_proto::parse_exprs(&distinct_on.on_expr, ctx, extension_codec)?;
833                let select_expr = from_proto::parse_exprs(
834                    &distinct_on.select_expr,
835                    ctx,
836                    extension_codec,
837                )?;
838                let sort_expr = match distinct_on.sort_expr.len() {
839                    0 => None,
840                    _ => Some(from_proto::parse_sorts(
841                        &distinct_on.sort_expr,
842                        ctx,
843                        extension_codec,
844                    )?),
845                };
846                LogicalPlanBuilder::from(input)
847                    .distinct_on(on_expr, select_expr, sort_expr)?
848                    .build()
849            }
850            LogicalPlanType::ViewScan(scan) => {
851                let schema: Schema = convert_required!(scan.schema)?;
852
853                let mut projection = None;
854                if let Some(columns) = &scan.projection {
855                    let column_indices = columns
856                        .columns
857                        .iter()
858                        .map(|name| schema.index_of(name))
859                        .collect::<Result<Vec<usize>, _>>()?;
860                    projection = Some(column_indices);
861                }
862
863                let input: LogicalPlan =
864                    into_logical_plan!(scan.input, ctx, extension_codec)?;
865
866                let definition = if !scan.definition.is_empty() {
867                    Some(scan.definition.clone())
868                } else {
869                    None
870                };
871
872                let provider = ViewTable::new(input, definition);
873
874                let table_name =
875                    from_table_reference(scan.table_name.as_ref(), "ViewScan")?;
876
877                LogicalPlanBuilder::scan(
878                    table_name,
879                    provider_as_source(Arc::new(provider)),
880                    projection,
881                )?
882                .build()
883            }
884            LogicalPlanType::Prepare(prepare) => {
885                let input: LogicalPlan =
886                    into_logical_plan!(prepare.input, ctx, extension_codec)?;
887                let data_types: Vec<DataType> = prepare
888                    .data_types
889                    .iter()
890                    .map(DataType::try_from)
891                    .collect::<Result<_, _>>()?;
892                LogicalPlanBuilder::from(input)
893                    .prepare(prepare.name.clone(), data_types)?
894                    .build()
895            }
896            LogicalPlanType::DropView(dropview) => {
897                Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
898                    name: from_table_reference(dropview.name.as_ref(), "DropView")?,
899                    if_exists: dropview.if_exists,
900                    schema: Arc::new(convert_required!(dropview.schema)?),
901                })))
902            }
903            LogicalPlanType::CopyTo(copy) => {
904                let input: LogicalPlan =
905                    into_logical_plan!(copy.input, ctx, extension_codec)?;
906
907                let file_type: Arc<dyn FileType> = format_as_file_type(
908                    extension_codec.try_decode_file_format(&copy.file_type, ctx)?,
909                );
910
911                Ok(LogicalPlan::Copy(dml::CopyTo::new(
912                    Arc::new(input),
913                    copy.output_url.clone(),
914                    copy.partition_by.clone(),
915                    file_type,
916                    Default::default(),
917                )))
918            }
919            LogicalPlanType::Unnest(unnest) => {
920                let input: LogicalPlan =
921                    into_logical_plan!(unnest.input, ctx, extension_codec)?;
922
923                LogicalPlanBuilder::from(input)
924                    .unnest_columns_with_options(
925                        unnest.exec_columns.iter().map(|c| c.into()).collect(),
926                        into_required!(unnest.options)?,
927                    )?
928                    .build()
929            }
930            LogicalPlanType::RecursiveQuery(recursive_query_node) => {
931                let static_term = recursive_query_node
932                    .static_term
933                    .as_ref()
934                    .ok_or_else(|| DataFusionError::Internal(String::from(
935                        "Protobuf deserialization error, RecursiveQueryNode was missing required field static_term.",
936                    )))?
937                    .try_into_logical_plan(ctx, extension_codec)?;
938
939                let recursive_term = recursive_query_node
940                    .recursive_term
941                    .as_ref()
942                    .ok_or_else(|| DataFusionError::Internal(String::from(
943                        "Protobuf deserialization error, RecursiveQueryNode was missing required field recursive_term.",
944                    )))?
945                    .try_into_logical_plan(ctx, extension_codec)?;
946
947                Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
948                    name: recursive_query_node.name.clone(),
949                    static_term: Arc::new(static_term),
950                    recursive_term: Arc::new(recursive_term),
951                    is_distinct: recursive_query_node.is_distinct,
952                }))
953            }
954            LogicalPlanType::CteWorkTableScan(cte_work_table_scan_node) => {
955                let CteWorkTableScanNode { name, schema } = cte_work_table_scan_node;
956                let schema = convert_required!(*schema)?;
957                let cte_work_table = CteWorkTable::new(name.as_str(), Arc::new(schema));
958                LogicalPlanBuilder::scan(
959                    name.as_str(),
960                    provider_as_source(Arc::new(cte_work_table)),
961                    None,
962                )?
963                .build()
964            }
965            LogicalPlanType::Dml(dml_node) => Ok(LogicalPlan::Dml(
966                datafusion::logical_expr::DmlStatement::new(
967                    from_table_reference(dml_node.table_name.as_ref(), "DML ")?,
968                    to_table_source(&dml_node.target, ctx, extension_codec)?,
969                    dml_node.dml_type().into(),
970                    Arc::new(into_logical_plan!(dml_node.input, ctx, extension_codec)?),
971                ),
972            )),
973        }
974    }
975
976    fn try_from_logical_plan(
977        plan: &LogicalPlan,
978        extension_codec: &dyn LogicalExtensionCodec,
979    ) -> Result<Self>
980    where
981        Self: Sized,
982    {
983        match plan {
984            LogicalPlan::Values(Values { values, .. }) => {
985                let n_cols = if values.is_empty() {
986                    0
987                } else {
988                    values[0].len()
989                } as u64;
990                let values_list =
991                    serialize_exprs(values.iter().flatten(), extension_codec)?;
992                Ok(LogicalPlanNode {
993                    logical_plan_type: Some(LogicalPlanType::Values(
994                        protobuf::ValuesNode {
995                            n_cols,
996                            values_list,
997                        },
998                    )),
999                })
1000            }
1001            LogicalPlan::TableScan(TableScan {
1002                table_name,
1003                source,
1004                filters,
1005                projection,
1006                ..
1007            }) => {
1008                let provider = source_as_provider(source)?;
1009                let schema = provider.schema();
1010                let source = provider.as_any();
1011
1012                let projection = match projection {
1013                    None => None,
1014                    Some(columns) => {
1015                        let column_names = columns
1016                            .iter()
1017                            .map(|i| schema.field(*i).name().to_owned())
1018                            .collect();
1019                        Some(protobuf::ProjectionColumns {
1020                            columns: column_names,
1021                        })
1022                    }
1023                };
1024
1025                let filters: Vec<protobuf::LogicalExprNode> =
1026                    serialize_exprs(filters, extension_codec)?;
1027
1028                if let Some(listing_table) = source.downcast_ref::<ListingTable>() {
1029                    let any = listing_table.options().format.as_any();
1030                    let file_format_type = {
1031                        let mut maybe_some_type = None;
1032
1033                        #[cfg(feature = "parquet")]
1034                        if let Some(parquet) = any.downcast_ref::<ParquetFormat>() {
1035                            let options = parquet.options();
1036                            maybe_some_type =
1037                                Some(FileFormatType::Parquet(protobuf::ParquetFormat {
1038                                    options: Some(options.try_into()?),
1039                                }));
1040                        };
1041
1042                        if let Some(csv) = any.downcast_ref::<CsvFormat>() {
1043                            let options = csv.options();
1044                            maybe_some_type =
1045                                Some(FileFormatType::Csv(protobuf::CsvFormat {
1046                                    options: Some(options.try_into()?),
1047                                }));
1048                        }
1049
1050                        if let Some(json) = any.downcast_ref::<OtherNdJsonFormat>() {
1051                            let options = json.options();
1052                            maybe_some_type =
1053                                Some(FileFormatType::Json(protobuf::NdJsonFormat {
1054                                    options: Some(options.try_into()?),
1055                                }))
1056                        }
1057
1058                        #[cfg(feature = "avro")]
1059                        if any.is::<AvroFormat>() {
1060                            maybe_some_type =
1061                                Some(FileFormatType::Avro(protobuf::AvroFormat {}))
1062                        }
1063
1064                        if any.is::<ArrowFormat>() {
1065                            maybe_some_type =
1066                                Some(FileFormatType::Arrow(protobuf::ArrowFormat {}))
1067                        }
1068
1069                        if let Some(file_format_type) = maybe_some_type {
1070                            file_format_type
1071                        } else {
1072                            return Err(proto_error(format!(
1073                                "Error deserializing unknown file format: {:?}",
1074                                listing_table.options().format
1075                            )));
1076                        }
1077                    };
1078
1079                    let options = listing_table.options();
1080
1081                    let mut builder = SchemaBuilder::from(schema.as_ref());
1082                    for (idx, field) in schema.fields().iter().enumerate().rev() {
1083                        if options
1084                            .table_partition_cols
1085                            .iter()
1086                            .any(|(name, _)| name == field.name())
1087                        {
1088                            builder.remove(idx);
1089                        }
1090                    }
1091
1092                    let schema = builder.finish();
1093
1094                    let schema: protobuf::Schema = (&schema).try_into()?;
1095
1096                    let mut exprs_vec: Vec<SortExprNodeCollection> = vec![];
1097                    for order in &options.file_sort_order {
1098                        let expr_vec = SortExprNodeCollection {
1099                            sort_expr_nodes: serialize_sorts(order, extension_codec)?,
1100                        };
1101                        exprs_vec.push(expr_vec);
1102                    }
1103
1104                    let partition_columns = options
1105                        .table_partition_cols
1106                        .iter()
1107                        .map(|(name, arrow_type)| {
1108                            let arrow_type = protobuf::ArrowType::try_from(arrow_type)
1109                                .map_err(|e| {
1110                                    proto_error(format!(
1111                                        "Received an unknown ArrowType: {e}"
1112                                    ))
1113                                })?;
1114                            Ok(protobuf::PartitionColumn {
1115                                name: name.clone(),
1116                                arrow_type: Some(arrow_type),
1117                            })
1118                        })
1119                        .collect::<Result<Vec<_>>>()?;
1120
1121                    Ok(LogicalPlanNode {
1122                        logical_plan_type: Some(LogicalPlanType::ListingScan(
1123                            protobuf::ListingTableScanNode {
1124                                file_format_type: Some(file_format_type),
1125                                table_name: Some(table_name.clone().into()),
1126                                collect_stat: options.collect_stat,
1127                                file_extension: options.file_extension.clone(),
1128                                table_partition_cols: partition_columns,
1129                                paths: listing_table
1130                                    .table_paths()
1131                                    .iter()
1132                                    .map(|x| x.to_string())
1133                                    .collect(),
1134                                schema: Some(schema),
1135                                projection,
1136                                filters,
1137                                target_partitions: options.target_partitions as u32,
1138                                file_sort_order: exprs_vec,
1139                            },
1140                        )),
1141                    })
1142                } else if let Some(view_table) = source.downcast_ref::<ViewTable>() {
1143                    let schema: protobuf::Schema = schema.as_ref().try_into()?;
1144                    Ok(LogicalPlanNode {
1145                        logical_plan_type: Some(LogicalPlanType::ViewScan(Box::new(
1146                            protobuf::ViewTableScanNode {
1147                                table_name: Some(table_name.clone().into()),
1148                                input: Some(Box::new(
1149                                    LogicalPlanNode::try_from_logical_plan(
1150                                        view_table.logical_plan(),
1151                                        extension_codec,
1152                                    )?,
1153                                )),
1154                                schema: Some(schema),
1155                                projection,
1156                                definition: view_table
1157                                    .definition()
1158                                    .map(|s| s.to_string())
1159                                    .unwrap_or_default(),
1160                            },
1161                        ))),
1162                    })
1163                } else if let Some(cte_work_table) = source.downcast_ref::<CteWorkTable>()
1164                {
1165                    let name = cte_work_table.name().to_string();
1166                    let schema = cte_work_table.schema();
1167                    let schema: protobuf::Schema = schema.as_ref().try_into()?;
1168
1169                    Ok(LogicalPlanNode {
1170                        logical_plan_type: Some(LogicalPlanType::CteWorkTableScan(
1171                            protobuf::CteWorkTableScanNode {
1172                                name,
1173                                schema: Some(schema),
1174                            },
1175                        )),
1176                    })
1177                } else {
1178                    let schema: protobuf::Schema = schema.as_ref().try_into()?;
1179                    let mut bytes = vec![];
1180                    extension_codec
1181                        .try_encode_table_provider(table_name, provider, &mut bytes)
1182                        .map_err(|e| context!("Error serializing custom table", e))?;
1183                    let scan = CustomScan(CustomTableScanNode {
1184                        table_name: Some(table_name.clone().into()),
1185                        projection,
1186                        schema: Some(schema),
1187                        filters,
1188                        custom_table_data: bytes,
1189                    });
1190                    let node = LogicalPlanNode {
1191                        logical_plan_type: Some(scan),
1192                    };
1193                    Ok(node)
1194                }
1195            }
1196            LogicalPlan::Projection(Projection { expr, input, .. }) => {
1197                Ok(LogicalPlanNode {
1198                    logical_plan_type: Some(LogicalPlanType::Projection(Box::new(
1199                        protobuf::ProjectionNode {
1200                            input: Some(Box::new(
1201                                LogicalPlanNode::try_from_logical_plan(
1202                                    input.as_ref(),
1203                                    extension_codec,
1204                                )?,
1205                            )),
1206                            expr: serialize_exprs(expr, extension_codec)?,
1207                            optional_alias: None,
1208                        },
1209                    ))),
1210                })
1211            }
1212            LogicalPlan::Filter(filter) => {
1213                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1214                    filter.input.as_ref(),
1215                    extension_codec,
1216                )?;
1217                Ok(LogicalPlanNode {
1218                    logical_plan_type: Some(LogicalPlanType::Selection(Box::new(
1219                        protobuf::SelectionNode {
1220                            input: Some(Box::new(input)),
1221                            expr: Some(serialize_expr(
1222                                &filter.predicate,
1223                                extension_codec,
1224                            )?),
1225                        },
1226                    ))),
1227                })
1228            }
1229            LogicalPlan::Distinct(Distinct::All(input)) => {
1230                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1231                    input.as_ref(),
1232                    extension_codec,
1233                )?;
1234                Ok(LogicalPlanNode {
1235                    logical_plan_type: Some(LogicalPlanType::Distinct(Box::new(
1236                        protobuf::DistinctNode {
1237                            input: Some(Box::new(input)),
1238                        },
1239                    ))),
1240                })
1241            }
1242            LogicalPlan::Distinct(Distinct::On(DistinctOn {
1243                on_expr,
1244                select_expr,
1245                sort_expr,
1246                input,
1247                ..
1248            })) => {
1249                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1250                    input.as_ref(),
1251                    extension_codec,
1252                )?;
1253                let sort_expr = match sort_expr {
1254                    None => vec![],
1255                    Some(sort_expr) => serialize_sorts(sort_expr, extension_codec)?,
1256                };
1257                Ok(LogicalPlanNode {
1258                    logical_plan_type: Some(LogicalPlanType::DistinctOn(Box::new(
1259                        protobuf::DistinctOnNode {
1260                            on_expr: serialize_exprs(on_expr, extension_codec)?,
1261                            select_expr: serialize_exprs(select_expr, extension_codec)?,
1262                            sort_expr,
1263                            input: Some(Box::new(input)),
1264                        },
1265                    ))),
1266                })
1267            }
1268            LogicalPlan::Window(Window {
1269                input, window_expr, ..
1270            }) => {
1271                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1272                    input.as_ref(),
1273                    extension_codec,
1274                )?;
1275                Ok(LogicalPlanNode {
1276                    logical_plan_type: Some(LogicalPlanType::Window(Box::new(
1277                        protobuf::WindowNode {
1278                            input: Some(Box::new(input)),
1279                            window_expr: serialize_exprs(window_expr, extension_codec)?,
1280                        },
1281                    ))),
1282                })
1283            }
1284            LogicalPlan::Aggregate(Aggregate {
1285                group_expr,
1286                aggr_expr,
1287                input,
1288                ..
1289            }) => {
1290                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1291                    input.as_ref(),
1292                    extension_codec,
1293                )?;
1294                Ok(LogicalPlanNode {
1295                    logical_plan_type: Some(LogicalPlanType::Aggregate(Box::new(
1296                        protobuf::AggregateNode {
1297                            input: Some(Box::new(input)),
1298                            group_expr: serialize_exprs(group_expr, extension_codec)?,
1299                            aggr_expr: serialize_exprs(aggr_expr, extension_codec)?,
1300                        },
1301                    ))),
1302                })
1303            }
1304            LogicalPlan::Join(Join {
1305                left,
1306                right,
1307                on,
1308                filter,
1309                join_type,
1310                join_constraint,
1311                null_equality,
1312                ..
1313            }) => {
1314                let left: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1315                    left.as_ref(),
1316                    extension_codec,
1317                )?;
1318                let right: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1319                    right.as_ref(),
1320                    extension_codec,
1321                )?;
1322                let (left_join_key, right_join_key) = on
1323                    .iter()
1324                    .map(|(l, r)| {
1325                        Ok((
1326                            serialize_expr(l, extension_codec)?,
1327                            serialize_expr(r, extension_codec)?,
1328                        ))
1329                    })
1330                    .collect::<Result<Vec<_>, ToProtoError>>()?
1331                    .into_iter()
1332                    .unzip();
1333                let join_type: protobuf::JoinType = join_type.to_owned().into();
1334                let join_constraint: protobuf::JoinConstraint =
1335                    join_constraint.to_owned().into();
1336                let null_equality: protobuf::NullEquality =
1337                    null_equality.to_owned().into();
1338                let filter = filter
1339                    .as_ref()
1340                    .map(|e| serialize_expr(e, extension_codec))
1341                    .map_or(Ok(None), |v| v.map(Some))?;
1342                Ok(LogicalPlanNode {
1343                    logical_plan_type: Some(LogicalPlanType::Join(Box::new(
1344                        protobuf::JoinNode {
1345                            left: Some(Box::new(left)),
1346                            right: Some(Box::new(right)),
1347                            join_type: join_type.into(),
1348                            join_constraint: join_constraint.into(),
1349                            left_join_key,
1350                            right_join_key,
1351                            null_equality: null_equality.into(),
1352                            filter,
1353                        },
1354                    ))),
1355                })
1356            }
1357            LogicalPlan::Subquery(_) => {
1358                not_impl_err!("LogicalPlan serde is not yet implemented for subqueries")
1359            }
1360            LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
1361                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1362                    input.as_ref(),
1363                    extension_codec,
1364                )?;
1365                Ok(LogicalPlanNode {
1366                    logical_plan_type: Some(LogicalPlanType::SubqueryAlias(Box::new(
1367                        protobuf::SubqueryAliasNode {
1368                            input: Some(Box::new(input)),
1369                            alias: Some((*alias).clone().into()),
1370                        },
1371                    ))),
1372                })
1373            }
1374            LogicalPlan::Limit(limit) => {
1375                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1376                    limit.input.as_ref(),
1377                    extension_codec,
1378                )?;
1379                let SkipType::Literal(skip) = limit.get_skip_type()? else {
1380                    return Err(proto_error(
1381                        "LogicalPlan::Limit only supports literal skip values",
1382                    ));
1383                };
1384                let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
1385                    return Err(proto_error(
1386                        "LogicalPlan::Limit only supports literal fetch values",
1387                    ));
1388                };
1389
1390                Ok(LogicalPlanNode {
1391                    logical_plan_type: Some(LogicalPlanType::Limit(Box::new(
1392                        protobuf::LimitNode {
1393                            input: Some(Box::new(input)),
1394                            skip: skip as i64,
1395                            fetch: fetch.unwrap_or(i64::MAX as usize) as i64,
1396                        },
1397                    ))),
1398                })
1399            }
1400            LogicalPlan::Sort(Sort { input, expr, fetch }) => {
1401                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1402                    input.as_ref(),
1403                    extension_codec,
1404                )?;
1405                let sort_expr: Vec<protobuf::SortExprNode> =
1406                    serialize_sorts(expr, extension_codec)?;
1407                Ok(LogicalPlanNode {
1408                    logical_plan_type: Some(LogicalPlanType::Sort(Box::new(
1409                        protobuf::SortNode {
1410                            input: Some(Box::new(input)),
1411                            expr: sort_expr,
1412                            fetch: fetch.map(|f| f as i64).unwrap_or(-1i64),
1413                        },
1414                    ))),
1415                })
1416            }
1417            LogicalPlan::Repartition(Repartition {
1418                input,
1419                partitioning_scheme,
1420            }) => {
1421                use datafusion::logical_expr::Partitioning;
1422                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1423                    input.as_ref(),
1424                    extension_codec,
1425                )?;
1426
1427                // Assumed common usize field was batch size
1428                // Used u64 to avoid any nastiness involving large values, most data clusters are probably uniformly 64 bits any ways
1429                use protobuf::repartition_node::PartitionMethod;
1430
1431                let pb_partition_method = match partitioning_scheme {
1432                    Partitioning::Hash(exprs, partition_count) => {
1433                        PartitionMethod::Hash(protobuf::HashRepartition {
1434                            hash_expr: serialize_exprs(exprs, extension_codec)?,
1435                            partition_count: *partition_count as u64,
1436                        })
1437                    }
1438                    Partitioning::RoundRobinBatch(partition_count) => {
1439                        PartitionMethod::RoundRobin(*partition_count as u64)
1440                    }
1441                    Partitioning::DistributeBy(_) => {
1442                        return not_impl_err!("DistributeBy")
1443                    }
1444                };
1445
1446                Ok(LogicalPlanNode {
1447                    logical_plan_type: Some(LogicalPlanType::Repartition(Box::new(
1448                        protobuf::RepartitionNode {
1449                            input: Some(Box::new(input)),
1450                            partition_method: Some(pb_partition_method),
1451                        },
1452                    ))),
1453                })
1454            }
1455            LogicalPlan::EmptyRelation(EmptyRelation {
1456                produce_one_row, ..
1457            }) => Ok(LogicalPlanNode {
1458                logical_plan_type: Some(LogicalPlanType::EmptyRelation(
1459                    protobuf::EmptyRelationNode {
1460                        produce_one_row: *produce_one_row,
1461                    },
1462                )),
1463            }),
1464            LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
1465                CreateExternalTable {
1466                    name,
1467                    location,
1468                    file_type,
1469                    schema: df_schema,
1470                    table_partition_cols,
1471                    if_not_exists,
1472                    definition,
1473                    order_exprs,
1474                    unbounded,
1475                    options,
1476                    constraints,
1477                    column_defaults,
1478                    temporary,
1479                },
1480            )) => {
1481                let mut converted_order_exprs: Vec<SortExprNodeCollection> = vec![];
1482                for order in order_exprs {
1483                    let temp = SortExprNodeCollection {
1484                        sort_expr_nodes: serialize_sorts(order, extension_codec)?,
1485                    };
1486                    converted_order_exprs.push(temp);
1487                }
1488
1489                let mut converted_column_defaults =
1490                    HashMap::with_capacity(column_defaults.len());
1491                for (col_name, expr) in column_defaults {
1492                    converted_column_defaults
1493                        .insert(col_name.clone(), serialize_expr(expr, extension_codec)?);
1494                }
1495
1496                Ok(LogicalPlanNode {
1497                    logical_plan_type: Some(LogicalPlanType::CreateExternalTable(
1498                        protobuf::CreateExternalTableNode {
1499                            name: Some(name.clone().into()),
1500                            location: location.clone(),
1501                            file_type: file_type.clone(),
1502                            schema: Some(df_schema.try_into()?),
1503                            table_partition_cols: table_partition_cols.clone(),
1504                            if_not_exists: *if_not_exists,
1505                            temporary: *temporary,
1506                            order_exprs: converted_order_exprs,
1507                            definition: definition.clone().unwrap_or_default(),
1508                            unbounded: *unbounded,
1509                            options: options.clone(),
1510                            constraints: Some(constraints.clone().into()),
1511                            column_defaults: converted_column_defaults,
1512                        },
1513                    )),
1514                })
1515            }
1516            LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1517                name,
1518                input,
1519                or_replace,
1520                definition,
1521                temporary,
1522            })) => Ok(LogicalPlanNode {
1523                logical_plan_type: Some(LogicalPlanType::CreateView(Box::new(
1524                    protobuf::CreateViewNode {
1525                        name: Some(name.clone().into()),
1526                        input: Some(Box::new(LogicalPlanNode::try_from_logical_plan(
1527                            input,
1528                            extension_codec,
1529                        )?)),
1530                        or_replace: *or_replace,
1531                        temporary: *temporary,
1532                        definition: definition.clone().unwrap_or_default(),
1533                    },
1534                ))),
1535            }),
1536            LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
1537                CreateCatalogSchema {
1538                    schema_name,
1539                    if_not_exists,
1540                    schema: df_schema,
1541                },
1542            )) => Ok(LogicalPlanNode {
1543                logical_plan_type: Some(LogicalPlanType::CreateCatalogSchema(
1544                    protobuf::CreateCatalogSchemaNode {
1545                        schema_name: schema_name.clone(),
1546                        if_not_exists: *if_not_exists,
1547                        schema: Some(df_schema.try_into()?),
1548                    },
1549                )),
1550            }),
1551            LogicalPlan::Ddl(DdlStatement::CreateCatalog(CreateCatalog {
1552                catalog_name,
1553                if_not_exists,
1554                schema: df_schema,
1555            })) => Ok(LogicalPlanNode {
1556                logical_plan_type: Some(LogicalPlanType::CreateCatalog(
1557                    protobuf::CreateCatalogNode {
1558                        catalog_name: catalog_name.clone(),
1559                        if_not_exists: *if_not_exists,
1560                        schema: Some(df_schema.try_into()?),
1561                    },
1562                )),
1563            }),
1564            LogicalPlan::Analyze(a) => {
1565                let input = LogicalPlanNode::try_from_logical_plan(
1566                    a.input.as_ref(),
1567                    extension_codec,
1568                )?;
1569                Ok(LogicalPlanNode {
1570                    logical_plan_type: Some(LogicalPlanType::Analyze(Box::new(
1571                        protobuf::AnalyzeNode {
1572                            input: Some(Box::new(input)),
1573                            verbose: a.verbose,
1574                        },
1575                    ))),
1576                })
1577            }
1578            LogicalPlan::Explain(a) => {
1579                let input = LogicalPlanNode::try_from_logical_plan(
1580                    a.plan.as_ref(),
1581                    extension_codec,
1582                )?;
1583                Ok(LogicalPlanNode {
1584                    logical_plan_type: Some(LogicalPlanType::Explain(Box::new(
1585                        protobuf::ExplainNode {
1586                            input: Some(Box::new(input)),
1587                            verbose: a.verbose,
1588                        },
1589                    ))),
1590                })
1591            }
1592            LogicalPlan::Union(union) => {
1593                let inputs: Vec<LogicalPlanNode> = union
1594                    .inputs
1595                    .iter()
1596                    .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
1597                    .collect::<Result<_>>()?;
1598                Ok(LogicalPlanNode {
1599                    logical_plan_type: Some(LogicalPlanType::Union(
1600                        protobuf::UnionNode { inputs },
1601                    )),
1602                })
1603            }
1604            LogicalPlan::Extension(extension) => {
1605                let mut buf: Vec<u8> = vec![];
1606                extension_codec.try_encode(extension, &mut buf)?;
1607
1608                let inputs: Vec<LogicalPlanNode> = extension
1609                    .node
1610                    .inputs()
1611                    .iter()
1612                    .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
1613                    .collect::<Result<_>>()?;
1614
1615                Ok(LogicalPlanNode {
1616                    logical_plan_type: Some(LogicalPlanType::Extension(
1617                        LogicalExtensionNode { node: buf, inputs },
1618                    )),
1619                })
1620            }
1621            LogicalPlan::Statement(Statement::Prepare(Prepare {
1622                name,
1623                data_types,
1624                input,
1625            })) => {
1626                let input =
1627                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1628                Ok(LogicalPlanNode {
1629                    logical_plan_type: Some(LogicalPlanType::Prepare(Box::new(
1630                        protobuf::PrepareNode {
1631                            name: name.clone(),
1632                            data_types: data_types
1633                                .iter()
1634                                .map(|t| t.try_into())
1635                                .collect::<Result<Vec<_>, _>>()?,
1636                            input: Some(Box::new(input)),
1637                        },
1638                    ))),
1639                })
1640            }
1641            LogicalPlan::Unnest(Unnest {
1642                input,
1643                exec_columns,
1644                list_type_columns,
1645                struct_type_columns,
1646                dependency_indices,
1647                schema,
1648                options,
1649            }) => {
1650                let input =
1651                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1652                let proto_unnest_list_items = list_type_columns
1653                    .iter()
1654                    .map(|(index, ul)| ColumnUnnestListItem {
1655                        input_index: *index as _,
1656                        recursion: Some(ColumnUnnestListRecursion {
1657                            output_column: Some(ul.output_column.to_owned().into()),
1658                            depth: ul.depth as _,
1659                        }),
1660                    })
1661                    .collect();
1662                Ok(LogicalPlanNode {
1663                    logical_plan_type: Some(LogicalPlanType::Unnest(Box::new(
1664                        protobuf::UnnestNode {
1665                            input: Some(Box::new(input)),
1666                            exec_columns: exec_columns
1667                                .iter()
1668                                .map(|col| col.into())
1669                                .collect(),
1670                            list_type_columns: proto_unnest_list_items,
1671                            struct_type_columns: struct_type_columns
1672                                .iter()
1673                                .map(|c| *c as u64)
1674                                .collect(),
1675                            dependency_indices: dependency_indices
1676                                .iter()
1677                                .map(|c| *c as u64)
1678                                .collect(),
1679                            schema: Some(schema.try_into()?),
1680                            options: Some(options.into()),
1681                        },
1682                    ))),
1683                })
1684            }
1685            LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(_)) => Err(proto_error(
1686                "LogicalPlan serde is not yet implemented for CreateMemoryTable",
1687            )),
1688            LogicalPlan::Ddl(DdlStatement::CreateIndex(_)) => Err(proto_error(
1689                "LogicalPlan serde is not yet implemented for CreateIndex",
1690            )),
1691            LogicalPlan::Ddl(DdlStatement::DropTable(_)) => Err(proto_error(
1692                "LogicalPlan serde is not yet implemented for DropTable",
1693            )),
1694            LogicalPlan::Ddl(DdlStatement::DropView(DropView {
1695                name,
1696                if_exists,
1697                schema,
1698            })) => Ok(LogicalPlanNode {
1699                logical_plan_type: Some(LogicalPlanType::DropView(
1700                    protobuf::DropViewNode {
1701                        name: Some(name.clone().into()),
1702                        if_exists: *if_exists,
1703                        schema: Some(schema.try_into()?),
1704                    },
1705                )),
1706            }),
1707            LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(_)) => Err(proto_error(
1708                "LogicalPlan serde is not yet implemented for DropCatalogSchema",
1709            )),
1710            LogicalPlan::Ddl(DdlStatement::CreateFunction(_)) => Err(proto_error(
1711                "LogicalPlan serde is not yet implemented for CreateFunction",
1712            )),
1713            LogicalPlan::Ddl(DdlStatement::DropFunction(_)) => Err(proto_error(
1714                "LogicalPlan serde is not yet implemented for DropFunction",
1715            )),
1716            LogicalPlan::Statement(_) => Err(proto_error(
1717                "LogicalPlan serde is not yet implemented for Statement",
1718            )),
1719            LogicalPlan::Dml(DmlStatement {
1720                table_name,
1721                target,
1722                op,
1723                input,
1724                ..
1725            }) => {
1726                let input =
1727                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1728                let dml_type: dml_node::Type = op.into();
1729                Ok(LogicalPlanNode {
1730                    logical_plan_type: Some(LogicalPlanType::Dml(Box::new(DmlNode {
1731                        input: Some(Box::new(input)),
1732                        target: Some(Box::new(from_table_source(
1733                            table_name.clone(),
1734                            Arc::clone(target),
1735                            extension_codec,
1736                        )?)),
1737                        table_name: Some(table_name.clone().into()),
1738                        dml_type: dml_type.into(),
1739                    }))),
1740                })
1741            }
1742            LogicalPlan::Copy(dml::CopyTo {
1743                input,
1744                output_url,
1745                file_type,
1746                partition_by,
1747                ..
1748            }) => {
1749                let input =
1750                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1751                let mut buf = Vec::new();
1752                extension_codec
1753                    .try_encode_file_format(&mut buf, file_type_to_format(file_type)?)?;
1754
1755                Ok(LogicalPlanNode {
1756                    logical_plan_type: Some(LogicalPlanType::CopyTo(Box::new(
1757                        protobuf::CopyToNode {
1758                            input: Some(Box::new(input)),
1759                            output_url: output_url.to_string(),
1760                            file_type: buf,
1761                            partition_by: partition_by.clone(),
1762                        },
1763                    ))),
1764                })
1765            }
1766            LogicalPlan::DescribeTable(_) => Err(proto_error(
1767                "LogicalPlan serde is not yet implemented for DescribeTable",
1768            )),
1769            LogicalPlan::RecursiveQuery(recursive) => {
1770                let static_term = LogicalPlanNode::try_from_logical_plan(
1771                    recursive.static_term.as_ref(),
1772                    extension_codec,
1773                )?;
1774                let recursive_term = LogicalPlanNode::try_from_logical_plan(
1775                    recursive.recursive_term.as_ref(),
1776                    extension_codec,
1777                )?;
1778
1779                Ok(LogicalPlanNode {
1780                    logical_plan_type: Some(LogicalPlanType::RecursiveQuery(Box::new(
1781                        protobuf::RecursiveQueryNode {
1782                            name: recursive.name.clone(),
1783                            static_term: Some(Box::new(static_term)),
1784                            recursive_term: Some(Box::new(recursive_term)),
1785                            is_distinct: recursive.is_distinct,
1786                        },
1787                    ))),
1788                })
1789            }
1790        }
1791    }
1792}