datafusion_proto/logical_plan/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::fmt::Debug;
20use std::sync::Arc;
21
22use crate::protobuf::logical_plan_node::LogicalPlanType::CustomScan;
23use crate::protobuf::{
24    dml_node, ColumnUnnestListItem, ColumnUnnestListRecursion, CteWorkTableScanNode,
25    CustomTableScanNode, DmlNode, SortExprNodeCollection,
26};
27use crate::{
28    convert_required, into_required,
29    protobuf::{
30        self, listing_table_scan_node::FileFormatType,
31        logical_plan_node::LogicalPlanType, LogicalExtensionNode, LogicalPlanNode,
32    },
33};
34
35use crate::protobuf::{proto_error, ToProtoError};
36use arrow::datatypes::{DataType, Schema, SchemaBuilder, SchemaRef};
37use datafusion::datasource::cte_worktable::CteWorkTable;
38#[cfg(feature = "avro")]
39use datafusion::datasource::file_format::avro::AvroFormat;
40#[cfg(feature = "parquet")]
41use datafusion::datasource::file_format::parquet::ParquetFormat;
42use datafusion::datasource::file_format::{
43    file_type_to_format, format_as_file_type, FileFormatFactory,
44};
45use datafusion::{
46    datasource::{
47        file_format::{
48            csv::CsvFormat, json::JsonFormat as OtherNdJsonFormat, FileFormat,
49        },
50        listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
51        view::ViewTable,
52        TableProvider,
53    },
54    datasource::{provider_as_source, source_as_provider},
55    prelude::SessionContext,
56};
57use datafusion_common::file_options::file_type::FileType;
58use datafusion_common::{
59    context, internal_datafusion_err, internal_err, not_impl_err, plan_err,
60    DataFusionError, Result, TableReference, ToDFSchema,
61};
62use datafusion_expr::{
63    dml,
64    logical_plan::{
65        builder::project, Aggregate, CreateCatalog, CreateCatalogSchema,
66        CreateExternalTable, CreateView, DdlStatement, Distinct, EmptyRelation,
67        Extension, Join, JoinConstraint, Prepare, Projection, Repartition, Sort,
68        SubqueryAlias, TableScan, Values, Window,
69    },
70    DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder, ScalarUDF, SortExpr,
71    Statement, WindowUDF,
72};
73use datafusion_expr::{
74    AggregateUDF, ColumnUnnestList, DmlStatement, FetchType, RecursiveQuery, SkipType,
75    TableSource, Unnest,
76};
77
78use self::to_proto::{serialize_expr, serialize_exprs};
79use crate::logical_plan::to_proto::serialize_sorts;
80use prost::bytes::BufMut;
81use prost::Message;
82
83pub mod file_formats;
84pub mod from_proto;
85pub mod to_proto;
86
87pub trait AsLogicalPlan: Debug + Send + Sync + Clone {
88    fn try_decode(buf: &[u8]) -> Result<Self>
89    where
90        Self: Sized;
91
92    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
93    where
94        B: BufMut,
95        Self: Sized;
96
97    fn try_into_logical_plan(
98        &self,
99        ctx: &SessionContext,
100        extension_codec: &dyn LogicalExtensionCodec,
101    ) -> Result<LogicalPlan>;
102
103    fn try_from_logical_plan(
104        plan: &LogicalPlan,
105        extension_codec: &dyn LogicalExtensionCodec,
106    ) -> Result<Self>
107    where
108        Self: Sized;
109}
110
111pub trait LogicalExtensionCodec: Debug + Send + Sync {
112    fn try_decode(
113        &self,
114        buf: &[u8],
115        inputs: &[LogicalPlan],
116        ctx: &SessionContext,
117    ) -> Result<Extension>;
118
119    fn try_encode(&self, node: &Extension, buf: &mut Vec<u8>) -> Result<()>;
120
121    fn try_decode_table_provider(
122        &self,
123        buf: &[u8],
124        table_ref: &TableReference,
125        schema: SchemaRef,
126        ctx: &SessionContext,
127    ) -> Result<Arc<dyn TableProvider>>;
128
129    fn try_encode_table_provider(
130        &self,
131        table_ref: &TableReference,
132        node: Arc<dyn TableProvider>,
133        buf: &mut Vec<u8>,
134    ) -> Result<()>;
135
136    fn try_decode_file_format(
137        &self,
138        _buf: &[u8],
139        _ctx: &SessionContext,
140    ) -> Result<Arc<dyn FileFormatFactory>> {
141        not_impl_err!("LogicalExtensionCodec is not provided for file format")
142    }
143
144    fn try_encode_file_format(
145        &self,
146        _buf: &mut Vec<u8>,
147        _node: Arc<dyn FileFormatFactory>,
148    ) -> Result<()> {
149        Ok(())
150    }
151
152    fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
153        not_impl_err!("LogicalExtensionCodec is not provided for scalar function {name}")
154    }
155
156    fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
157        Ok(())
158    }
159
160    fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
161        not_impl_err!(
162            "LogicalExtensionCodec is not provided for aggregate function {name}"
163        )
164    }
165
166    fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
167        Ok(())
168    }
169
170    fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
171        not_impl_err!("LogicalExtensionCodec is not provided for window function {name}")
172    }
173
174    fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
175        Ok(())
176    }
177}
178
179#[derive(Debug, Clone)]
180pub struct DefaultLogicalExtensionCodec {}
181
182impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
183    fn try_decode(
184        &self,
185        _buf: &[u8],
186        _inputs: &[LogicalPlan],
187        _ctx: &SessionContext,
188    ) -> Result<Extension> {
189        not_impl_err!("LogicalExtensionCodec is not provided")
190    }
191
192    fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<()> {
193        not_impl_err!("LogicalExtensionCodec is not provided")
194    }
195
196    fn try_decode_table_provider(
197        &self,
198        _buf: &[u8],
199        _table_ref: &TableReference,
200        _schema: SchemaRef,
201        _ctx: &SessionContext,
202    ) -> Result<Arc<dyn TableProvider>> {
203        not_impl_err!("LogicalExtensionCodec is not provided")
204    }
205
206    fn try_encode_table_provider(
207        &self,
208        _table_ref: &TableReference,
209        _node: Arc<dyn TableProvider>,
210        _buf: &mut Vec<u8>,
211    ) -> Result<()> {
212        not_impl_err!("LogicalExtensionCodec is not provided")
213    }
214}
215
216#[macro_export]
217macro_rules! into_logical_plan {
218    ($PB:expr, $CTX:expr, $CODEC:expr) => {{
219        if let Some(field) = $PB.as_ref() {
220            field.as_ref().try_into_logical_plan($CTX, $CODEC)
221        } else {
222            Err(proto_error("Missing required field in protobuf"))
223        }
224    }};
225}
226
227fn from_table_reference(
228    table_ref: Option<&protobuf::TableReference>,
229    error_context: &str,
230) -> Result<TableReference> {
231    let table_ref = table_ref.ok_or_else(|| {
232        DataFusionError::Internal(format!(
233            "Protobuf deserialization error, {error_context} was missing required field name."
234        ))
235    })?;
236
237    Ok(table_ref.clone().try_into()?)
238}
239
240/// Converts [LogicalPlan::TableScan] to [TableSource]
241/// method to be used to deserialize nodes
242/// serialized by [from_table_source]
243fn to_table_source(
244    node: &Option<Box<LogicalPlanNode>>,
245    ctx: &SessionContext,
246    extension_codec: &dyn LogicalExtensionCodec,
247) -> Result<Arc<dyn TableSource>> {
248    if let Some(node) = node {
249        match node.try_into_logical_plan(ctx, extension_codec)? {
250            LogicalPlan::TableScan(TableScan { source, .. }) => Ok(source),
251            _ => plan_err!("expected TableScan node"),
252        }
253    } else {
254        plan_err!("LogicalPlanNode should be provided")
255    }
256}
257
258/// converts [TableSource] to [LogicalPlan::TableScan]
259/// using [LogicalPlan::TableScan] was the best approach to
260/// serialize [TableSource] to [LogicalPlan::TableScan]
261fn from_table_source(
262    table_name: TableReference,
263    target: Arc<dyn TableSource>,
264    extension_codec: &dyn LogicalExtensionCodec,
265) -> Result<LogicalPlanNode> {
266    let projected_schema = target.schema().to_dfschema_ref()?;
267    let r = LogicalPlan::TableScan(TableScan {
268        table_name,
269        source: target,
270        projection: None,
271        projected_schema,
272        filters: vec![],
273        fetch: None,
274    });
275
276    LogicalPlanNode::try_from_logical_plan(&r, extension_codec)
277}
278
279impl AsLogicalPlan for LogicalPlanNode {
280    fn try_decode(buf: &[u8]) -> Result<Self>
281    where
282        Self: Sized,
283    {
284        LogicalPlanNode::decode(buf).map_err(|e| {
285            DataFusionError::Internal(format!("failed to decode logical plan: {e:?}"))
286        })
287    }
288
289    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
290    where
291        B: BufMut,
292        Self: Sized,
293    {
294        self.encode(buf).map_err(|e| {
295            DataFusionError::Internal(format!("failed to encode logical plan: {e:?}"))
296        })
297    }
298
299    fn try_into_logical_plan(
300        &self,
301        ctx: &SessionContext,
302        extension_codec: &dyn LogicalExtensionCodec,
303    ) -> Result<LogicalPlan> {
304        let plan = self.logical_plan_type.as_ref().ok_or_else(|| {
305            proto_error(format!(
306                "logical_plan::from_proto() Unsupported logical plan '{self:?}'"
307            ))
308        })?;
309        match plan {
310            LogicalPlanType::Values(values) => {
311                let n_cols = values.n_cols as usize;
312                let values: Vec<Vec<Expr>> = if values.values_list.is_empty() {
313                    Ok(Vec::new())
314                } else if values.values_list.len() % n_cols != 0 {
315                    internal_err!(
316                        "Invalid values list length, expect {} to be divisible by {}",
317                        values.values_list.len(),
318                        n_cols
319                    )
320                } else {
321                    values
322                        .values_list
323                        .chunks_exact(n_cols)
324                        .map(|r| from_proto::parse_exprs(r, ctx, extension_codec))
325                        .collect::<Result<Vec<_>, _>>()
326                        .map_err(|e| e.into())
327                }?;
328
329                LogicalPlanBuilder::values(values)?.build()
330            }
331            LogicalPlanType::Projection(projection) => {
332                let input: LogicalPlan =
333                    into_logical_plan!(projection.input, ctx, extension_codec)?;
334                let expr: Vec<Expr> =
335                    from_proto::parse_exprs(&projection.expr, ctx, extension_codec)?;
336
337                let new_proj = project(input, expr)?;
338                match projection.optional_alias.as_ref() {
339                    Some(a) => match a {
340                        protobuf::projection_node::OptionalAlias::Alias(alias) => {
341                            Ok(LogicalPlan::SubqueryAlias(SubqueryAlias::try_new(
342                                Arc::new(new_proj),
343                                alias.clone(),
344                            )?))
345                        }
346                    },
347                    _ => Ok(new_proj),
348                }
349            }
350            LogicalPlanType::Selection(selection) => {
351                let input: LogicalPlan =
352                    into_logical_plan!(selection.input, ctx, extension_codec)?;
353                let expr: Expr = selection
354                    .expr
355                    .as_ref()
356                    .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
357                    .transpose()?
358                    .ok_or_else(|| proto_error("expression required"))?;
359                LogicalPlanBuilder::from(input).filter(expr)?.build()
360            }
361            LogicalPlanType::Window(window) => {
362                let input: LogicalPlan =
363                    into_logical_plan!(window.input, ctx, extension_codec)?;
364                let window_expr =
365                    from_proto::parse_exprs(&window.window_expr, ctx, extension_codec)?;
366                LogicalPlanBuilder::from(input).window(window_expr)?.build()
367            }
368            LogicalPlanType::Aggregate(aggregate) => {
369                let input: LogicalPlan =
370                    into_logical_plan!(aggregate.input, ctx, extension_codec)?;
371                let group_expr =
372                    from_proto::parse_exprs(&aggregate.group_expr, ctx, extension_codec)?;
373                let aggr_expr =
374                    from_proto::parse_exprs(&aggregate.aggr_expr, ctx, extension_codec)?;
375                LogicalPlanBuilder::from(input)
376                    .aggregate(group_expr, aggr_expr)?
377                    .build()
378            }
379            LogicalPlanType::ListingScan(scan) => {
380                let schema: Schema = convert_required!(scan.schema)?;
381
382                let mut projection = None;
383                if let Some(columns) = &scan.projection {
384                    let column_indices = columns
385                        .columns
386                        .iter()
387                        .map(|name| schema.index_of(name))
388                        .collect::<Result<Vec<usize>, _>>()?;
389                    projection = Some(column_indices);
390                }
391
392                let filters =
393                    from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
394
395                let mut all_sort_orders = vec![];
396                for order in &scan.file_sort_order {
397                    all_sort_orders.push(from_proto::parse_sorts(
398                        &order.sort_expr_nodes,
399                        ctx,
400                        extension_codec,
401                    )?)
402                }
403
404                let file_format: Arc<dyn FileFormat> =
405                    match scan.file_format_type.as_ref().ok_or_else(|| {
406                        proto_error(format!(
407                            "logical_plan::from_proto() Unsupported file format '{self:?}'"
408                        ))
409                    })? {
410                        #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
411                        FileFormatType::Parquet(protobuf::ParquetFormat {options}) => {
412                            #[cfg(feature = "parquet")]
413                            {
414                                let mut parquet = ParquetFormat::default();
415                                if let Some(options) = options {
416                                    parquet = parquet.with_options(options.try_into()?)
417                                }
418                                Arc::new(parquet)
419                            }
420                            #[cfg(not(feature = "parquet"))]
421                            panic!("Unable to process parquet file since `parquet` feature is not enabled");
422                        }
423                        FileFormatType::Csv(protobuf::CsvFormat {
424                            options
425                        }) => {
426                            let mut csv = CsvFormat::default();
427                            if let Some(options) = options {
428                                csv = csv.with_options(options.try_into()?)
429                            }
430                            Arc::new(csv)
431                        },
432                        FileFormatType::Json(protobuf::NdJsonFormat {
433                            options
434                        }) => {
435                            let mut json = OtherNdJsonFormat::default();
436                            if let Some(options) = options {
437                                json = json.with_options(options.try_into()?)
438                            }
439                            Arc::new(json)
440                        }
441                        #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
442                        FileFormatType::Avro(..) => {
443                            #[cfg(feature = "avro")] 
444                            {
445                                Arc::new(AvroFormat)
446                            }
447                            #[cfg(not(feature = "avro"))]
448                            panic!("Unable to process avro file since `avro` feature is not enabled");
449                        }
450                    };
451
452                let table_paths = &scan
453                    .paths
454                    .iter()
455                    .map(ListingTableUrl::parse)
456                    .collect::<Result<Vec<_>, _>>()?;
457
458                let partition_columns = scan
459                    .table_partition_cols
460                    .iter()
461                    .map(|col| {
462                        let Some(arrow_type) = col.arrow_type.as_ref() else {
463                            return Err(proto_error(
464                                "Missing Arrow type in partition columns",
465                            ));
466                        };
467                        let arrow_type = DataType::try_from(arrow_type).map_err(|e| {
468                            proto_error(format!("Received an unknown ArrowType: {e}"))
469                        })?;
470                        Ok((col.name.clone(), arrow_type))
471                    })
472                    .collect::<Result<Vec<_>>>()?;
473
474                let options = ListingOptions::new(file_format)
475                    .with_file_extension(&scan.file_extension)
476                    .with_table_partition_cols(partition_columns)
477                    .with_collect_stat(scan.collect_stat)
478                    .with_target_partitions(scan.target_partitions as usize)
479                    .with_file_sort_order(all_sort_orders);
480
481                let config =
482                    ListingTableConfig::new_with_multi_paths(table_paths.clone())
483                        .with_listing_options(options)
484                        .with_schema(Arc::new(schema));
485
486                let provider = ListingTable::try_new(config)?.with_cache(
487                    ctx.state()
488                        .runtime_env()
489                        .cache_manager
490                        .get_file_statistic_cache(),
491                );
492
493                let table_name =
494                    from_table_reference(scan.table_name.as_ref(), "ListingTableScan")?;
495
496                LogicalPlanBuilder::scan_with_filters(
497                    table_name,
498                    provider_as_source(Arc::new(provider)),
499                    projection,
500                    filters,
501                )?
502                .build()
503            }
504            LogicalPlanType::CustomScan(scan) => {
505                let schema: Schema = convert_required!(scan.schema)?;
506                let schema = Arc::new(schema);
507                let mut projection = None;
508                if let Some(columns) = &scan.projection {
509                    let column_indices = columns
510                        .columns
511                        .iter()
512                        .map(|name| schema.index_of(name))
513                        .collect::<Result<Vec<usize>, _>>()?;
514                    projection = Some(column_indices);
515                }
516
517                let filters =
518                    from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
519
520                let table_name =
521                    from_table_reference(scan.table_name.as_ref(), "CustomScan")?;
522
523                let provider = extension_codec.try_decode_table_provider(
524                    &scan.custom_table_data,
525                    &table_name,
526                    schema,
527                    ctx,
528                )?;
529
530                LogicalPlanBuilder::scan_with_filters(
531                    table_name,
532                    provider_as_source(provider),
533                    projection,
534                    filters,
535                )?
536                .build()
537            }
538            LogicalPlanType::Sort(sort) => {
539                let input: LogicalPlan =
540                    into_logical_plan!(sort.input, ctx, extension_codec)?;
541                let sort_expr: Vec<SortExpr> =
542                    from_proto::parse_sorts(&sort.expr, ctx, extension_codec)?;
543                let fetch: Option<usize> = sort.fetch.try_into().ok();
544                LogicalPlanBuilder::from(input)
545                    .sort_with_limit(sort_expr, fetch)?
546                    .build()
547            }
548            LogicalPlanType::Repartition(repartition) => {
549                use datafusion::logical_expr::Partitioning;
550                let input: LogicalPlan =
551                    into_logical_plan!(repartition.input, ctx, extension_codec)?;
552                use protobuf::repartition_node::PartitionMethod;
553                let pb_partition_method = repartition.partition_method.as_ref().ok_or_else(|| {
554                    internal_datafusion_err!(
555                        "Protobuf deserialization error, RepartitionNode was missing required field 'partition_method'"
556                    )
557                })?;
558
559                let partitioning_scheme = match pb_partition_method {
560                    PartitionMethod::Hash(protobuf::HashRepartition {
561                        hash_expr: pb_hash_expr,
562                        partition_count,
563                    }) => Partitioning::Hash(
564                        from_proto::parse_exprs(pb_hash_expr, ctx, extension_codec)?,
565                        *partition_count as usize,
566                    ),
567                    PartitionMethod::RoundRobin(partition_count) => {
568                        Partitioning::RoundRobinBatch(*partition_count as usize)
569                    }
570                };
571
572                LogicalPlanBuilder::from(input)
573                    .repartition(partitioning_scheme)?
574                    .build()
575            }
576            LogicalPlanType::EmptyRelation(empty_relation) => {
577                LogicalPlanBuilder::empty(empty_relation.produce_one_row).build()
578            }
579            LogicalPlanType::CreateExternalTable(create_extern_table) => {
580                let pb_schema = (create_extern_table.schema.clone()).ok_or_else(|| {
581                    DataFusionError::Internal(String::from(
582                        "Protobuf deserialization error, CreateExternalTableNode was missing required field schema."
583                    ))
584                })?;
585
586                let constraints = (create_extern_table.constraints.clone()).ok_or_else(|| {
587                    DataFusionError::Internal(String::from(
588                        "Protobuf deserialization error, CreateExternalTableNode was missing required table constraints.",
589                    ))
590                })?;
591                let definition = if !create_extern_table.definition.is_empty() {
592                    Some(create_extern_table.definition.clone())
593                } else {
594                    None
595                };
596
597                let file_type = create_extern_table.file_type.as_str();
598                if ctx.table_factory(file_type).is_none() {
599                    internal_err!("No TableProviderFactory for file type: {file_type}")?
600                }
601
602                let mut order_exprs = vec![];
603                for expr in &create_extern_table.order_exprs {
604                    order_exprs.push(from_proto::parse_sorts(
605                        &expr.sort_expr_nodes,
606                        ctx,
607                        extension_codec,
608                    )?);
609                }
610
611                let mut column_defaults =
612                    HashMap::with_capacity(create_extern_table.column_defaults.len());
613                for (col_name, expr) in &create_extern_table.column_defaults {
614                    let expr = from_proto::parse_expr(expr, ctx, extension_codec)?;
615                    column_defaults.insert(col_name.clone(), expr);
616                }
617
618                Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
619                    CreateExternalTable {
620                        schema: pb_schema.try_into()?,
621                        name: from_table_reference(
622                            create_extern_table.name.as_ref(),
623                            "CreateExternalTable",
624                        )?,
625                        location: create_extern_table.location.clone(),
626                        file_type: create_extern_table.file_type.clone(),
627                        table_partition_cols: create_extern_table
628                            .table_partition_cols
629                            .clone(),
630                        order_exprs,
631                        if_not_exists: create_extern_table.if_not_exists,
632                        temporary: create_extern_table.temporary,
633                        definition,
634                        unbounded: create_extern_table.unbounded,
635                        options: create_extern_table.options.clone(),
636                        constraints: constraints.into(),
637                        column_defaults,
638                    },
639                )))
640            }
641            LogicalPlanType::CreateView(create_view) => {
642                let plan = create_view
643                    .input.clone().ok_or_else(|| DataFusionError::Internal(String::from(
644                    "Protobuf deserialization error, CreateViewNode has invalid LogicalPlan input.",
645                )))?
646                    .try_into_logical_plan(ctx, extension_codec)?;
647                let definition = if !create_view.definition.is_empty() {
648                    Some(create_view.definition.clone())
649                } else {
650                    None
651                };
652
653                Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
654                    name: from_table_reference(create_view.name.as_ref(), "CreateView")?,
655                    temporary: create_view.temporary,
656                    input: Arc::new(plan),
657                    or_replace: create_view.or_replace,
658                    definition,
659                })))
660            }
661            LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => {
662                let pb_schema = (create_catalog_schema.schema.clone()).ok_or_else(|| {
663                    DataFusionError::Internal(String::from(
664                        "Protobuf deserialization error, CreateCatalogSchemaNode was missing required field schema.",
665                    ))
666                })?;
667
668                Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
669                    CreateCatalogSchema {
670                        schema_name: create_catalog_schema.schema_name.clone(),
671                        if_not_exists: create_catalog_schema.if_not_exists,
672                        schema: pb_schema.try_into()?,
673                    },
674                )))
675            }
676            LogicalPlanType::CreateCatalog(create_catalog) => {
677                let pb_schema = (create_catalog.schema.clone()).ok_or_else(|| {
678                    DataFusionError::Internal(String::from(
679                        "Protobuf deserialization error, CreateCatalogNode was missing required field schema.",
680                    ))
681                })?;
682
683                Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
684                    CreateCatalog {
685                        catalog_name: create_catalog.catalog_name.clone(),
686                        if_not_exists: create_catalog.if_not_exists,
687                        schema: pb_schema.try_into()?,
688                    },
689                )))
690            }
691            LogicalPlanType::Analyze(analyze) => {
692                let input: LogicalPlan =
693                    into_logical_plan!(analyze.input, ctx, extension_codec)?;
694                LogicalPlanBuilder::from(input)
695                    .explain(analyze.verbose, true)?
696                    .build()
697            }
698            LogicalPlanType::Explain(explain) => {
699                let input: LogicalPlan =
700                    into_logical_plan!(explain.input, ctx, extension_codec)?;
701                LogicalPlanBuilder::from(input)
702                    .explain(explain.verbose, false)?
703                    .build()
704            }
705            LogicalPlanType::SubqueryAlias(aliased_relation) => {
706                let input: LogicalPlan =
707                    into_logical_plan!(aliased_relation.input, ctx, extension_codec)?;
708                let alias = from_table_reference(
709                    aliased_relation.alias.as_ref(),
710                    "SubqueryAlias",
711                )?;
712                LogicalPlanBuilder::from(input).alias(alias)?.build()
713            }
714            LogicalPlanType::Limit(limit) => {
715                let input: LogicalPlan =
716                    into_logical_plan!(limit.input, ctx, extension_codec)?;
717                let skip = limit.skip.max(0) as usize;
718
719                let fetch = if limit.fetch < 0 {
720                    None
721                } else {
722                    Some(limit.fetch as usize)
723                };
724
725                LogicalPlanBuilder::from(input).limit(skip, fetch)?.build()
726            }
727            LogicalPlanType::Join(join) => {
728                let left_keys: Vec<Expr> =
729                    from_proto::parse_exprs(&join.left_join_key, ctx, extension_codec)?;
730                let right_keys: Vec<Expr> =
731                    from_proto::parse_exprs(&join.right_join_key, ctx, extension_codec)?;
732                let join_type =
733                    protobuf::JoinType::try_from(join.join_type).map_err(|_| {
734                        proto_error(format!(
735                            "Received a JoinNode message with unknown JoinType {}",
736                            join.join_type
737                        ))
738                    })?;
739                let join_constraint = protobuf::JoinConstraint::try_from(
740                    join.join_constraint,
741                )
742                .map_err(|_| {
743                    proto_error(format!(
744                        "Received a JoinNode message with unknown JoinConstraint {}",
745                        join.join_constraint
746                    ))
747                })?;
748                let filter: Option<Expr> = join
749                    .filter
750                    .as_ref()
751                    .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
752                    .map_or(Ok(None), |v| v.map(Some))?;
753
754                let builder = LogicalPlanBuilder::from(into_logical_plan!(
755                    join.left,
756                    ctx,
757                    extension_codec
758                )?);
759                let builder = match join_constraint.into() {
760                    JoinConstraint::On => builder.join_with_expr_keys(
761                        into_logical_plan!(join.right, ctx, extension_codec)?,
762                        join_type.into(),
763                        (left_keys, right_keys),
764                        filter,
765                    )?,
766                    JoinConstraint::Using => {
767                        // The equijoin keys in using-join must be column.
768                        let using_keys = left_keys
769                            .into_iter()
770                            .map(|key| {
771                                key.try_as_col().cloned()
772                                    .ok_or_else(|| internal_datafusion_err!(
773                                        "Using join keys must be column references, got: {key:?}"
774                                    ))
775                            })
776                            .collect::<Result<Vec<_>, _>>()?;
777                        builder.join_using(
778                            into_logical_plan!(join.right, ctx, extension_codec)?,
779                            join_type.into(),
780                            using_keys,
781                        )?
782                    }
783                };
784
785                builder.build()
786            }
787            LogicalPlanType::Union(union) => {
788                if union.inputs.len() < 2 {
789                    return  Err( DataFusionError::Internal(String::from(
790                        "Protobuf deserialization error, Union was require at least two input.",
791                    )));
792                }
793                let (first, rest) = union.inputs.split_first().unwrap();
794                let mut builder = LogicalPlanBuilder::from(
795                    first.try_into_logical_plan(ctx, extension_codec)?,
796                );
797
798                for i in rest {
799                    let plan = i.try_into_logical_plan(ctx, extension_codec)?;
800                    builder = builder.union(plan)?;
801                }
802                builder.build()
803            }
804            LogicalPlanType::CrossJoin(crossjoin) => {
805                let left = into_logical_plan!(crossjoin.left, ctx, extension_codec)?;
806                let right = into_logical_plan!(crossjoin.right, ctx, extension_codec)?;
807
808                LogicalPlanBuilder::from(left).cross_join(right)?.build()
809            }
810            LogicalPlanType::Extension(LogicalExtensionNode { node, inputs }) => {
811                let input_plans: Vec<LogicalPlan> = inputs
812                    .iter()
813                    .map(|i| i.try_into_logical_plan(ctx, extension_codec))
814                    .collect::<Result<_>>()?;
815
816                let extension_node =
817                    extension_codec.try_decode(node, &input_plans, ctx)?;
818                Ok(LogicalPlan::Extension(extension_node))
819            }
820            LogicalPlanType::Distinct(distinct) => {
821                let input: LogicalPlan =
822                    into_logical_plan!(distinct.input, ctx, extension_codec)?;
823                LogicalPlanBuilder::from(input).distinct()?.build()
824            }
825            LogicalPlanType::DistinctOn(distinct_on) => {
826                let input: LogicalPlan =
827                    into_logical_plan!(distinct_on.input, ctx, extension_codec)?;
828                let on_expr =
829                    from_proto::parse_exprs(&distinct_on.on_expr, ctx, extension_codec)?;
830                let select_expr = from_proto::parse_exprs(
831                    &distinct_on.select_expr,
832                    ctx,
833                    extension_codec,
834                )?;
835                let sort_expr = match distinct_on.sort_expr.len() {
836                    0 => None,
837                    _ => Some(from_proto::parse_sorts(
838                        &distinct_on.sort_expr,
839                        ctx,
840                        extension_codec,
841                    )?),
842                };
843                LogicalPlanBuilder::from(input)
844                    .distinct_on(on_expr, select_expr, sort_expr)?
845                    .build()
846            }
847            LogicalPlanType::ViewScan(scan) => {
848                let schema: Schema = convert_required!(scan.schema)?;
849
850                let mut projection = None;
851                if let Some(columns) = &scan.projection {
852                    let column_indices = columns
853                        .columns
854                        .iter()
855                        .map(|name| schema.index_of(name))
856                        .collect::<Result<Vec<usize>, _>>()?;
857                    projection = Some(column_indices);
858                }
859
860                let input: LogicalPlan =
861                    into_logical_plan!(scan.input, ctx, extension_codec)?;
862
863                let definition = if !scan.definition.is_empty() {
864                    Some(scan.definition.clone())
865                } else {
866                    None
867                };
868
869                let provider = ViewTable::new(input, definition);
870
871                let table_name =
872                    from_table_reference(scan.table_name.as_ref(), "ViewScan")?;
873
874                LogicalPlanBuilder::scan(
875                    table_name,
876                    provider_as_source(Arc::new(provider)),
877                    projection,
878                )?
879                .build()
880            }
881            LogicalPlanType::Prepare(prepare) => {
882                let input: LogicalPlan =
883                    into_logical_plan!(prepare.input, ctx, extension_codec)?;
884                let data_types: Vec<DataType> = prepare
885                    .data_types
886                    .iter()
887                    .map(DataType::try_from)
888                    .collect::<Result<_, _>>()?;
889                LogicalPlanBuilder::from(input)
890                    .prepare(prepare.name.clone(), data_types)?
891                    .build()
892            }
893            LogicalPlanType::DropView(dropview) => {
894                Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
895                    name: from_table_reference(dropview.name.as_ref(), "DropView")?,
896                    if_exists: dropview.if_exists,
897                    schema: Arc::new(convert_required!(dropview.schema)?),
898                })))
899            }
900            LogicalPlanType::CopyTo(copy) => {
901                let input: LogicalPlan =
902                    into_logical_plan!(copy.input, ctx, extension_codec)?;
903
904                let file_type: Arc<dyn FileType> = format_as_file_type(
905                    extension_codec.try_decode_file_format(&copy.file_type, ctx)?,
906                );
907
908                Ok(LogicalPlan::Copy(dml::CopyTo {
909                    input: Arc::new(input),
910                    output_url: copy.output_url.clone(),
911                    partition_by: copy.partition_by.clone(),
912                    file_type,
913                    options: Default::default(),
914                }))
915            }
916            LogicalPlanType::Unnest(unnest) => {
917                let input: LogicalPlan =
918                    into_logical_plan!(unnest.input, ctx, extension_codec)?;
919                Ok(LogicalPlan::Unnest(Unnest {
920                    input: Arc::new(input),
921                    exec_columns: unnest.exec_columns.iter().map(|c| c.into()).collect(),
922                    list_type_columns: unnest
923                        .list_type_columns
924                        .iter()
925                        .map(|c| {
926                            let recursion_item = c.recursion.as_ref().unwrap();
927                            (
928                                c.input_index as _,
929                                ColumnUnnestList {
930                                    output_column: recursion_item
931                                        .output_column
932                                        .as_ref()
933                                        .unwrap()
934                                        .into(),
935                                    depth: recursion_item.depth as _,
936                                },
937                            )
938                        })
939                        .collect(),
940                    struct_type_columns: unnest
941                        .struct_type_columns
942                        .iter()
943                        .map(|c| *c as usize)
944                        .collect(),
945                    dependency_indices: unnest
946                        .dependency_indices
947                        .iter()
948                        .map(|c| *c as usize)
949                        .collect(),
950                    schema: Arc::new(convert_required!(unnest.schema)?),
951                    options: into_required!(unnest.options)?,
952                }))
953            }
954            LogicalPlanType::RecursiveQuery(recursive_query_node) => {
955                let static_term = recursive_query_node
956                    .static_term
957                    .as_ref()
958                    .ok_or_else(|| DataFusionError::Internal(String::from(
959                        "Protobuf deserialization error, RecursiveQueryNode was missing required field static_term.",
960                    )))?
961                    .try_into_logical_plan(ctx, extension_codec)?;
962
963                let recursive_term = recursive_query_node
964                    .recursive_term
965                    .as_ref()
966                    .ok_or_else(|| DataFusionError::Internal(String::from(
967                        "Protobuf deserialization error, RecursiveQueryNode was missing required field recursive_term.",
968                    )))?
969                    .try_into_logical_plan(ctx, extension_codec)?;
970
971                Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
972                    name: recursive_query_node.name.clone(),
973                    static_term: Arc::new(static_term),
974                    recursive_term: Arc::new(recursive_term),
975                    is_distinct: recursive_query_node.is_distinct,
976                }))
977            }
978            LogicalPlanType::CteWorkTableScan(cte_work_table_scan_node) => {
979                let CteWorkTableScanNode { name, schema } = cte_work_table_scan_node;
980                let schema = convert_required!(*schema)?;
981                let cte_work_table = CteWorkTable::new(name.as_str(), Arc::new(schema));
982                LogicalPlanBuilder::scan(
983                    name.as_str(),
984                    provider_as_source(Arc::new(cte_work_table)),
985                    None,
986                )?
987                .build()
988            }
989            LogicalPlanType::Dml(dml_node) => Ok(LogicalPlan::Dml(
990                datafusion::logical_expr::DmlStatement::new(
991                    from_table_reference(dml_node.table_name.as_ref(), "DML ")?,
992                    to_table_source(&dml_node.target, ctx, extension_codec)?,
993                    dml_node.dml_type().into(),
994                    Arc::new(into_logical_plan!(dml_node.input, ctx, extension_codec)?),
995                ),
996            )),
997        }
998    }
999
1000    fn try_from_logical_plan(
1001        plan: &LogicalPlan,
1002        extension_codec: &dyn LogicalExtensionCodec,
1003    ) -> Result<Self>
1004    where
1005        Self: Sized,
1006    {
1007        match plan {
1008            LogicalPlan::Values(Values { values, .. }) => {
1009                let n_cols = if values.is_empty() {
1010                    0
1011                } else {
1012                    values[0].len()
1013                } as u64;
1014                let values_list =
1015                    serialize_exprs(values.iter().flatten(), extension_codec)?;
1016                Ok(LogicalPlanNode {
1017                    logical_plan_type: Some(LogicalPlanType::Values(
1018                        protobuf::ValuesNode {
1019                            n_cols,
1020                            values_list,
1021                        },
1022                    )),
1023                })
1024            }
1025            LogicalPlan::TableScan(TableScan {
1026                table_name,
1027                source,
1028                filters,
1029                projection,
1030                ..
1031            }) => {
1032                let provider = source_as_provider(source)?;
1033                let schema = provider.schema();
1034                let source = provider.as_any();
1035
1036                let projection = match projection {
1037                    None => None,
1038                    Some(columns) => {
1039                        let column_names = columns
1040                            .iter()
1041                            .map(|i| schema.field(*i).name().to_owned())
1042                            .collect();
1043                        Some(protobuf::ProjectionColumns {
1044                            columns: column_names,
1045                        })
1046                    }
1047                };
1048
1049                let filters: Vec<protobuf::LogicalExprNode> =
1050                    serialize_exprs(filters, extension_codec)?;
1051
1052                if let Some(listing_table) = source.downcast_ref::<ListingTable>() {
1053                    let any = listing_table.options().format.as_any();
1054                    let file_format_type = {
1055                        let mut maybe_some_type = None;
1056
1057                        #[cfg(feature = "parquet")]
1058                        if let Some(parquet) = any.downcast_ref::<ParquetFormat>() {
1059                            let options = parquet.options();
1060                            maybe_some_type =
1061                                Some(FileFormatType::Parquet(protobuf::ParquetFormat {
1062                                    options: Some(options.try_into()?),
1063                                }));
1064                        };
1065
1066                        if let Some(csv) = any.downcast_ref::<CsvFormat>() {
1067                            let options = csv.options();
1068                            maybe_some_type =
1069                                Some(FileFormatType::Csv(protobuf::CsvFormat {
1070                                    options: Some(options.try_into()?),
1071                                }));
1072                        }
1073
1074                        if let Some(json) = any.downcast_ref::<OtherNdJsonFormat>() {
1075                            let options = json.options();
1076                            maybe_some_type =
1077                                Some(FileFormatType::Json(protobuf::NdJsonFormat {
1078                                    options: Some(options.try_into()?),
1079                                }))
1080                        }
1081
1082                        #[cfg(feature = "avro")]
1083                        if any.is::<AvroFormat>() {
1084                            maybe_some_type =
1085                                Some(FileFormatType::Avro(protobuf::AvroFormat {}))
1086                        }
1087
1088                        if let Some(file_format_type) = maybe_some_type {
1089                            file_format_type
1090                        } else {
1091                            return Err(proto_error(format!(
1092                            "Error converting file format, {:?} is invalid as a datafusion format.",
1093                            listing_table.options().format
1094                        )));
1095                        }
1096                    };
1097
1098                    let options = listing_table.options();
1099
1100                    let mut builder = SchemaBuilder::from(schema.as_ref());
1101                    for (idx, field) in schema.fields().iter().enumerate().rev() {
1102                        if options
1103                            .table_partition_cols
1104                            .iter()
1105                            .any(|(name, _)| name == field.name())
1106                        {
1107                            builder.remove(idx);
1108                        }
1109                    }
1110
1111                    let schema = builder.finish();
1112
1113                    let schema: protobuf::Schema = (&schema).try_into()?;
1114
1115                    let mut exprs_vec: Vec<SortExprNodeCollection> = vec![];
1116                    for order in &options.file_sort_order {
1117                        let expr_vec = SortExprNodeCollection {
1118                            sort_expr_nodes: serialize_sorts(order, extension_codec)?,
1119                        };
1120                        exprs_vec.push(expr_vec);
1121                    }
1122
1123                    let partition_columns = options
1124                        .table_partition_cols
1125                        .iter()
1126                        .map(|(name, arrow_type)| {
1127                            let arrow_type = protobuf::ArrowType::try_from(arrow_type)
1128                                .map_err(|e| {
1129                                    proto_error(format!(
1130                                        "Received an unknown ArrowType: {e}"
1131                                    ))
1132                                })?;
1133                            Ok(protobuf::PartitionColumn {
1134                                name: name.clone(),
1135                                arrow_type: Some(arrow_type),
1136                            })
1137                        })
1138                        .collect::<Result<Vec<_>>>()?;
1139
1140                    Ok(LogicalPlanNode {
1141                        logical_plan_type: Some(LogicalPlanType::ListingScan(
1142                            protobuf::ListingTableScanNode {
1143                                file_format_type: Some(file_format_type),
1144                                table_name: Some(table_name.clone().into()),
1145                                collect_stat: options.collect_stat,
1146                                file_extension: options.file_extension.clone(),
1147                                table_partition_cols: partition_columns,
1148                                paths: listing_table
1149                                    .table_paths()
1150                                    .iter()
1151                                    .map(|x| x.to_string())
1152                                    .collect(),
1153                                schema: Some(schema),
1154                                projection,
1155                                filters,
1156                                target_partitions: options.target_partitions as u32,
1157                                file_sort_order: exprs_vec,
1158                            },
1159                        )),
1160                    })
1161                } else if let Some(view_table) = source.downcast_ref::<ViewTable>() {
1162                    let schema: protobuf::Schema = schema.as_ref().try_into()?;
1163                    Ok(LogicalPlanNode {
1164                        logical_plan_type: Some(LogicalPlanType::ViewScan(Box::new(
1165                            protobuf::ViewTableScanNode {
1166                                table_name: Some(table_name.clone().into()),
1167                                input: Some(Box::new(
1168                                    LogicalPlanNode::try_from_logical_plan(
1169                                        view_table.logical_plan(),
1170                                        extension_codec,
1171                                    )?,
1172                                )),
1173                                schema: Some(schema),
1174                                projection,
1175                                definition: view_table
1176                                    .definition()
1177                                    .map(|s| s.to_string())
1178                                    .unwrap_or_default(),
1179                            },
1180                        ))),
1181                    })
1182                } else if let Some(cte_work_table) = source.downcast_ref::<CteWorkTable>()
1183                {
1184                    let name = cte_work_table.name().to_string();
1185                    let schema = cte_work_table.schema();
1186                    let schema: protobuf::Schema = schema.as_ref().try_into()?;
1187
1188                    Ok(LogicalPlanNode {
1189                        logical_plan_type: Some(LogicalPlanType::CteWorkTableScan(
1190                            protobuf::CteWorkTableScanNode {
1191                                name,
1192                                schema: Some(schema),
1193                            },
1194                        )),
1195                    })
1196                } else {
1197                    let schema: protobuf::Schema = schema.as_ref().try_into()?;
1198                    let mut bytes = vec![];
1199                    extension_codec
1200                        .try_encode_table_provider(table_name, provider, &mut bytes)
1201                        .map_err(|e| context!("Error serializing custom table", e))?;
1202                    let scan = CustomScan(CustomTableScanNode {
1203                        table_name: Some(table_name.clone().into()),
1204                        projection,
1205                        schema: Some(schema),
1206                        filters,
1207                        custom_table_data: bytes,
1208                    });
1209                    let node = LogicalPlanNode {
1210                        logical_plan_type: Some(scan),
1211                    };
1212                    Ok(node)
1213                }
1214            }
1215            LogicalPlan::Projection(Projection { expr, input, .. }) => {
1216                Ok(LogicalPlanNode {
1217                    logical_plan_type: Some(LogicalPlanType::Projection(Box::new(
1218                        protobuf::ProjectionNode {
1219                            input: Some(Box::new(
1220                                LogicalPlanNode::try_from_logical_plan(
1221                                    input.as_ref(),
1222                                    extension_codec,
1223                                )?,
1224                            )),
1225                            expr: serialize_exprs(expr, extension_codec)?,
1226                            optional_alias: None,
1227                        },
1228                    ))),
1229                })
1230            }
1231            LogicalPlan::Filter(filter) => {
1232                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1233                    filter.input.as_ref(),
1234                    extension_codec,
1235                )?;
1236                Ok(LogicalPlanNode {
1237                    logical_plan_type: Some(LogicalPlanType::Selection(Box::new(
1238                        protobuf::SelectionNode {
1239                            input: Some(Box::new(input)),
1240                            expr: Some(serialize_expr(
1241                                &filter.predicate,
1242                                extension_codec,
1243                            )?),
1244                        },
1245                    ))),
1246                })
1247            }
1248            LogicalPlan::Distinct(Distinct::All(input)) => {
1249                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1250                    input.as_ref(),
1251                    extension_codec,
1252                )?;
1253                Ok(LogicalPlanNode {
1254                    logical_plan_type: Some(LogicalPlanType::Distinct(Box::new(
1255                        protobuf::DistinctNode {
1256                            input: Some(Box::new(input)),
1257                        },
1258                    ))),
1259                })
1260            }
1261            LogicalPlan::Distinct(Distinct::On(DistinctOn {
1262                on_expr,
1263                select_expr,
1264                sort_expr,
1265                input,
1266                ..
1267            })) => {
1268                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1269                    input.as_ref(),
1270                    extension_codec,
1271                )?;
1272                let sort_expr = match sort_expr {
1273                    None => vec![],
1274                    Some(sort_expr) => serialize_sorts(sort_expr, extension_codec)?,
1275                };
1276                Ok(LogicalPlanNode {
1277                    logical_plan_type: Some(LogicalPlanType::DistinctOn(Box::new(
1278                        protobuf::DistinctOnNode {
1279                            on_expr: serialize_exprs(on_expr, extension_codec)?,
1280                            select_expr: serialize_exprs(select_expr, extension_codec)?,
1281                            sort_expr,
1282                            input: Some(Box::new(input)),
1283                        },
1284                    ))),
1285                })
1286            }
1287            LogicalPlan::Window(Window {
1288                input, window_expr, ..
1289            }) => {
1290                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1291                    input.as_ref(),
1292                    extension_codec,
1293                )?;
1294                Ok(LogicalPlanNode {
1295                    logical_plan_type: Some(LogicalPlanType::Window(Box::new(
1296                        protobuf::WindowNode {
1297                            input: Some(Box::new(input)),
1298                            window_expr: serialize_exprs(window_expr, extension_codec)?,
1299                        },
1300                    ))),
1301                })
1302            }
1303            LogicalPlan::Aggregate(Aggregate {
1304                group_expr,
1305                aggr_expr,
1306                input,
1307                ..
1308            }) => {
1309                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1310                    input.as_ref(),
1311                    extension_codec,
1312                )?;
1313                Ok(LogicalPlanNode {
1314                    logical_plan_type: Some(LogicalPlanType::Aggregate(Box::new(
1315                        protobuf::AggregateNode {
1316                            input: Some(Box::new(input)),
1317                            group_expr: serialize_exprs(group_expr, extension_codec)?,
1318                            aggr_expr: serialize_exprs(aggr_expr, extension_codec)?,
1319                        },
1320                    ))),
1321                })
1322            }
1323            LogicalPlan::Join(Join {
1324                left,
1325                right,
1326                on,
1327                filter,
1328                join_type,
1329                join_constraint,
1330                null_equals_null,
1331                ..
1332            }) => {
1333                let left: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1334                    left.as_ref(),
1335                    extension_codec,
1336                )?;
1337                let right: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1338                    right.as_ref(),
1339                    extension_codec,
1340                )?;
1341                let (left_join_key, right_join_key) = on
1342                    .iter()
1343                    .map(|(l, r)| {
1344                        Ok((
1345                            serialize_expr(l, extension_codec)?,
1346                            serialize_expr(r, extension_codec)?,
1347                        ))
1348                    })
1349                    .collect::<Result<Vec<_>, ToProtoError>>()?
1350                    .into_iter()
1351                    .unzip();
1352                let join_type: protobuf::JoinType = join_type.to_owned().into();
1353                let join_constraint: protobuf::JoinConstraint =
1354                    join_constraint.to_owned().into();
1355                let filter = filter
1356                    .as_ref()
1357                    .map(|e| serialize_expr(e, extension_codec))
1358                    .map_or(Ok(None), |v| v.map(Some))?;
1359                Ok(LogicalPlanNode {
1360                    logical_plan_type: Some(LogicalPlanType::Join(Box::new(
1361                        protobuf::JoinNode {
1362                            left: Some(Box::new(left)),
1363                            right: Some(Box::new(right)),
1364                            join_type: join_type.into(),
1365                            join_constraint: join_constraint.into(),
1366                            left_join_key,
1367                            right_join_key,
1368                            null_equals_null: *null_equals_null,
1369                            filter,
1370                        },
1371                    ))),
1372                })
1373            }
1374            LogicalPlan::Subquery(_) => {
1375                not_impl_err!("LogicalPlan serde is not yet implemented for subqueries")
1376            }
1377            LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
1378                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1379                    input.as_ref(),
1380                    extension_codec,
1381                )?;
1382                Ok(LogicalPlanNode {
1383                    logical_plan_type: Some(LogicalPlanType::SubqueryAlias(Box::new(
1384                        protobuf::SubqueryAliasNode {
1385                            input: Some(Box::new(input)),
1386                            alias: Some((*alias).clone().into()),
1387                        },
1388                    ))),
1389                })
1390            }
1391            LogicalPlan::Limit(limit) => {
1392                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1393                    limit.input.as_ref(),
1394                    extension_codec,
1395                )?;
1396                let SkipType::Literal(skip) = limit.get_skip_type()? else {
1397                    return Err(proto_error(
1398                        "LogicalPlan::Limit only supports literal skip values",
1399                    ));
1400                };
1401                let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
1402                    return Err(proto_error(
1403                        "LogicalPlan::Limit only supports literal fetch values",
1404                    ));
1405                };
1406
1407                Ok(LogicalPlanNode {
1408                    logical_plan_type: Some(LogicalPlanType::Limit(Box::new(
1409                        protobuf::LimitNode {
1410                            input: Some(Box::new(input)),
1411                            skip: skip as i64,
1412                            fetch: fetch.unwrap_or(i64::MAX as usize) as i64,
1413                        },
1414                    ))),
1415                })
1416            }
1417            LogicalPlan::Sort(Sort { input, expr, fetch }) => {
1418                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1419                    input.as_ref(),
1420                    extension_codec,
1421                )?;
1422                let sort_expr: Vec<protobuf::SortExprNode> =
1423                    serialize_sorts(expr, extension_codec)?;
1424                Ok(LogicalPlanNode {
1425                    logical_plan_type: Some(LogicalPlanType::Sort(Box::new(
1426                        protobuf::SortNode {
1427                            input: Some(Box::new(input)),
1428                            expr: sort_expr,
1429                            fetch: fetch.map(|f| f as i64).unwrap_or(-1i64),
1430                        },
1431                    ))),
1432                })
1433            }
1434            LogicalPlan::Repartition(Repartition {
1435                input,
1436                partitioning_scheme,
1437            }) => {
1438                use datafusion::logical_expr::Partitioning;
1439                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1440                    input.as_ref(),
1441                    extension_codec,
1442                )?;
1443
1444                // Assumed common usize field was batch size
1445                // Used u64 to avoid any nastyness involving large values, most data clusters are probably uniformly 64 bits any ways
1446                use protobuf::repartition_node::PartitionMethod;
1447
1448                let pb_partition_method = match partitioning_scheme {
1449                    Partitioning::Hash(exprs, partition_count) => {
1450                        PartitionMethod::Hash(protobuf::HashRepartition {
1451                            hash_expr: serialize_exprs(exprs, extension_codec)?,
1452                            partition_count: *partition_count as u64,
1453                        })
1454                    }
1455                    Partitioning::RoundRobinBatch(partition_count) => {
1456                        PartitionMethod::RoundRobin(*partition_count as u64)
1457                    }
1458                    Partitioning::DistributeBy(_) => {
1459                        return not_impl_err!("DistributeBy")
1460                    }
1461                };
1462
1463                Ok(LogicalPlanNode {
1464                    logical_plan_type: Some(LogicalPlanType::Repartition(Box::new(
1465                        protobuf::RepartitionNode {
1466                            input: Some(Box::new(input)),
1467                            partition_method: Some(pb_partition_method),
1468                        },
1469                    ))),
1470                })
1471            }
1472            LogicalPlan::EmptyRelation(EmptyRelation {
1473                produce_one_row, ..
1474            }) => Ok(LogicalPlanNode {
1475                logical_plan_type: Some(LogicalPlanType::EmptyRelation(
1476                    protobuf::EmptyRelationNode {
1477                        produce_one_row: *produce_one_row,
1478                    },
1479                )),
1480            }),
1481            LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
1482                CreateExternalTable {
1483                    name,
1484                    location,
1485                    file_type,
1486                    schema: df_schema,
1487                    table_partition_cols,
1488                    if_not_exists,
1489                    definition,
1490                    order_exprs,
1491                    unbounded,
1492                    options,
1493                    constraints,
1494                    column_defaults,
1495                    temporary,
1496                },
1497            )) => {
1498                let mut converted_order_exprs: Vec<SortExprNodeCollection> = vec![];
1499                for order in order_exprs {
1500                    let temp = SortExprNodeCollection {
1501                        sort_expr_nodes: serialize_sorts(order, extension_codec)?,
1502                    };
1503                    converted_order_exprs.push(temp);
1504                }
1505
1506                let mut converted_column_defaults =
1507                    HashMap::with_capacity(column_defaults.len());
1508                for (col_name, expr) in column_defaults {
1509                    converted_column_defaults
1510                        .insert(col_name.clone(), serialize_expr(expr, extension_codec)?);
1511                }
1512
1513                Ok(LogicalPlanNode {
1514                    logical_plan_type: Some(LogicalPlanType::CreateExternalTable(
1515                        protobuf::CreateExternalTableNode {
1516                            name: Some(name.clone().into()),
1517                            location: location.clone(),
1518                            file_type: file_type.clone(),
1519                            schema: Some(df_schema.try_into()?),
1520                            table_partition_cols: table_partition_cols.clone(),
1521                            if_not_exists: *if_not_exists,
1522                            temporary: *temporary,
1523                            order_exprs: converted_order_exprs,
1524                            definition: definition.clone().unwrap_or_default(),
1525                            unbounded: *unbounded,
1526                            options: options.clone(),
1527                            constraints: Some(constraints.clone().into()),
1528                            column_defaults: converted_column_defaults,
1529                        },
1530                    )),
1531                })
1532            }
1533            LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1534                name,
1535                input,
1536                or_replace,
1537                definition,
1538                temporary,
1539            })) => Ok(LogicalPlanNode {
1540                logical_plan_type: Some(LogicalPlanType::CreateView(Box::new(
1541                    protobuf::CreateViewNode {
1542                        name: Some(name.clone().into()),
1543                        input: Some(Box::new(LogicalPlanNode::try_from_logical_plan(
1544                            input,
1545                            extension_codec,
1546                        )?)),
1547                        or_replace: *or_replace,
1548                        temporary: *temporary,
1549                        definition: definition.clone().unwrap_or_default(),
1550                    },
1551                ))),
1552            }),
1553            LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
1554                CreateCatalogSchema {
1555                    schema_name,
1556                    if_not_exists,
1557                    schema: df_schema,
1558                },
1559            )) => Ok(LogicalPlanNode {
1560                logical_plan_type: Some(LogicalPlanType::CreateCatalogSchema(
1561                    protobuf::CreateCatalogSchemaNode {
1562                        schema_name: schema_name.clone(),
1563                        if_not_exists: *if_not_exists,
1564                        schema: Some(df_schema.try_into()?),
1565                    },
1566                )),
1567            }),
1568            LogicalPlan::Ddl(DdlStatement::CreateCatalog(CreateCatalog {
1569                catalog_name,
1570                if_not_exists,
1571                schema: df_schema,
1572            })) => Ok(LogicalPlanNode {
1573                logical_plan_type: Some(LogicalPlanType::CreateCatalog(
1574                    protobuf::CreateCatalogNode {
1575                        catalog_name: catalog_name.clone(),
1576                        if_not_exists: *if_not_exists,
1577                        schema: Some(df_schema.try_into()?),
1578                    },
1579                )),
1580            }),
1581            LogicalPlan::Analyze(a) => {
1582                let input = LogicalPlanNode::try_from_logical_plan(
1583                    a.input.as_ref(),
1584                    extension_codec,
1585                )?;
1586                Ok(LogicalPlanNode {
1587                    logical_plan_type: Some(LogicalPlanType::Analyze(Box::new(
1588                        protobuf::AnalyzeNode {
1589                            input: Some(Box::new(input)),
1590                            verbose: a.verbose,
1591                        },
1592                    ))),
1593                })
1594            }
1595            LogicalPlan::Explain(a) => {
1596                let input = LogicalPlanNode::try_from_logical_plan(
1597                    a.plan.as_ref(),
1598                    extension_codec,
1599                )?;
1600                Ok(LogicalPlanNode {
1601                    logical_plan_type: Some(LogicalPlanType::Explain(Box::new(
1602                        protobuf::ExplainNode {
1603                            input: Some(Box::new(input)),
1604                            verbose: a.verbose,
1605                        },
1606                    ))),
1607                })
1608            }
1609            LogicalPlan::Union(union) => {
1610                let inputs: Vec<LogicalPlanNode> = union
1611                    .inputs
1612                    .iter()
1613                    .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
1614                    .collect::<Result<_>>()?;
1615                Ok(LogicalPlanNode {
1616                    logical_plan_type: Some(LogicalPlanType::Union(
1617                        protobuf::UnionNode { inputs },
1618                    )),
1619                })
1620            }
1621            LogicalPlan::Extension(extension) => {
1622                let mut buf: Vec<u8> = vec![];
1623                extension_codec.try_encode(extension, &mut buf)?;
1624
1625                let inputs: Vec<LogicalPlanNode> = extension
1626                    .node
1627                    .inputs()
1628                    .iter()
1629                    .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
1630                    .collect::<Result<_>>()?;
1631
1632                Ok(LogicalPlanNode {
1633                    logical_plan_type: Some(LogicalPlanType::Extension(
1634                        LogicalExtensionNode { node: buf, inputs },
1635                    )),
1636                })
1637            }
1638            LogicalPlan::Statement(Statement::Prepare(Prepare {
1639                name,
1640                data_types,
1641                input,
1642            })) => {
1643                let input =
1644                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1645                Ok(LogicalPlanNode {
1646                    logical_plan_type: Some(LogicalPlanType::Prepare(Box::new(
1647                        protobuf::PrepareNode {
1648                            name: name.clone(),
1649                            data_types: data_types
1650                                .iter()
1651                                .map(|t| t.try_into())
1652                                .collect::<Result<Vec<_>, _>>()?,
1653                            input: Some(Box::new(input)),
1654                        },
1655                    ))),
1656                })
1657            }
1658            LogicalPlan::Unnest(Unnest {
1659                input,
1660                exec_columns,
1661                list_type_columns,
1662                struct_type_columns,
1663                dependency_indices,
1664                schema,
1665                options,
1666            }) => {
1667                let input =
1668                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1669                let proto_unnest_list_items = list_type_columns
1670                    .iter()
1671                    .map(|(index, ul)| ColumnUnnestListItem {
1672                        input_index: *index as _,
1673                        recursion: Some(ColumnUnnestListRecursion {
1674                            output_column: Some(ul.output_column.to_owned().into()),
1675                            depth: ul.depth as _,
1676                        }),
1677                    })
1678                    .collect();
1679                Ok(LogicalPlanNode {
1680                    logical_plan_type: Some(LogicalPlanType::Unnest(Box::new(
1681                        protobuf::UnnestNode {
1682                            input: Some(Box::new(input)),
1683                            exec_columns: exec_columns
1684                                .iter()
1685                                .map(|col| col.into())
1686                                .collect(),
1687                            list_type_columns: proto_unnest_list_items,
1688                            struct_type_columns: struct_type_columns
1689                                .iter()
1690                                .map(|c| *c as u64)
1691                                .collect(),
1692                            dependency_indices: dependency_indices
1693                                .iter()
1694                                .map(|c| *c as u64)
1695                                .collect(),
1696                            schema: Some(schema.try_into()?),
1697                            options: Some(options.into()),
1698                        },
1699                    ))),
1700                })
1701            }
1702            LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(_)) => Err(proto_error(
1703                "LogicalPlan serde is not yet implemented for CreateMemoryTable",
1704            )),
1705            LogicalPlan::Ddl(DdlStatement::CreateIndex(_)) => Err(proto_error(
1706                "LogicalPlan serde is not yet implemented for CreateIndex",
1707            )),
1708            LogicalPlan::Ddl(DdlStatement::DropTable(_)) => Err(proto_error(
1709                "LogicalPlan serde is not yet implemented for DropTable",
1710            )),
1711            LogicalPlan::Ddl(DdlStatement::DropView(DropView {
1712                name,
1713                if_exists,
1714                schema,
1715            })) => Ok(LogicalPlanNode {
1716                logical_plan_type: Some(LogicalPlanType::DropView(
1717                    protobuf::DropViewNode {
1718                        name: Some(name.clone().into()),
1719                        if_exists: *if_exists,
1720                        schema: Some(schema.try_into()?),
1721                    },
1722                )),
1723            }),
1724            LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(_)) => Err(proto_error(
1725                "LogicalPlan serde is not yet implemented for DropCatalogSchema",
1726            )),
1727            LogicalPlan::Ddl(DdlStatement::CreateFunction(_)) => Err(proto_error(
1728                "LogicalPlan serde is not yet implemented for CreateFunction",
1729            )),
1730            LogicalPlan::Ddl(DdlStatement::DropFunction(_)) => Err(proto_error(
1731                "LogicalPlan serde is not yet implemented for DropFunction",
1732            )),
1733            LogicalPlan::Statement(_) => Err(proto_error(
1734                "LogicalPlan serde is not yet implemented for Statement",
1735            )),
1736            LogicalPlan::Dml(DmlStatement {
1737                table_name,
1738                target,
1739                op,
1740                input,
1741                ..
1742            }) => {
1743                let input =
1744                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1745                let dml_type: dml_node::Type = op.into();
1746                Ok(LogicalPlanNode {
1747                    logical_plan_type: Some(LogicalPlanType::Dml(Box::new(DmlNode {
1748                        input: Some(Box::new(input)),
1749                        target: Some(Box::new(from_table_source(
1750                            table_name.clone(),
1751                            Arc::clone(target),
1752                            extension_codec,
1753                        )?)),
1754                        table_name: Some(table_name.clone().into()),
1755                        dml_type: dml_type.into(),
1756                    }))),
1757                })
1758            }
1759            LogicalPlan::Copy(dml::CopyTo {
1760                input,
1761                output_url,
1762                file_type,
1763                partition_by,
1764                ..
1765            }) => {
1766                let input =
1767                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1768                let mut buf = Vec::new();
1769                extension_codec
1770                    .try_encode_file_format(&mut buf, file_type_to_format(file_type)?)?;
1771
1772                Ok(LogicalPlanNode {
1773                    logical_plan_type: Some(LogicalPlanType::CopyTo(Box::new(
1774                        protobuf::CopyToNode {
1775                            input: Some(Box::new(input)),
1776                            output_url: output_url.to_string(),
1777                            file_type: buf,
1778                            partition_by: partition_by.clone(),
1779                        },
1780                    ))),
1781                })
1782            }
1783            LogicalPlan::DescribeTable(_) => Err(proto_error(
1784                "LogicalPlan serde is not yet implemented for DescribeTable",
1785            )),
1786            LogicalPlan::RecursiveQuery(recursive) => {
1787                let static_term = LogicalPlanNode::try_from_logical_plan(
1788                    recursive.static_term.as_ref(),
1789                    extension_codec,
1790                )?;
1791                let recursive_term = LogicalPlanNode::try_from_logical_plan(
1792                    recursive.recursive_term.as_ref(),
1793                    extension_codec,
1794                )?;
1795
1796                Ok(LogicalPlanNode {
1797                    logical_plan_type: Some(LogicalPlanType::RecursiveQuery(Box::new(
1798                        protobuf::RecursiveQueryNode {
1799                            name: recursive.name.clone(),
1800                            static_term: Some(Box::new(static_term)),
1801                            recursive_term: Some(Box::new(recursive_term)),
1802                            is_distinct: recursive.is_distinct,
1803                        },
1804                    ))),
1805                })
1806            }
1807        }
1808    }
1809}